// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Threading;
namespace System.Reactive.Subjects
{
///
/// Represents an object that is both an observable sequence as well as an observer.
/// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
///
/// The type of the elements processed by the subject.
public sealed class ReplaySubject : ISubject, IDisposable
{
#region Fields
///
/// Underlying optimized implementation of the replay subject.
///
private readonly IReplaySubjectImplementation _implementation;
#endregion
#region Constructors
#region All
///
/// Initializes a new instance of the class.
///
public ReplaySubject()
: this(int.MaxValue)
{
}
///
/// Initializes a new instance of the class with the specified scheduler.
///
/// Scheduler the observers are invoked on.
/// is null.
public ReplaySubject(IScheduler scheduler)
{
_implementation = new ReplayByTime(scheduler);
}
#endregion
#region Count
///
/// Initializes a new instance of the class with the specified buffer size.
///
/// Maximum element count of the replay buffer.
/// is less than zero.
public ReplaySubject(int bufferSize)
{
switch (bufferSize)
{
case 1:
_implementation = new ReplayOne();
break;
case int.MaxValue:
_implementation = new ReplayAll();
break;
default:
_implementation = new ReplayMany(bufferSize);
break;
}
}
///
/// Initializes a new instance of the class with the specified buffer size and scheduler.
///
/// Maximum element count of the replay buffer.
/// Scheduler the observers are invoked on.
/// is null.
/// is less than zero.
public ReplaySubject(int bufferSize, IScheduler scheduler)
{
_implementation = new ReplayByTime(bufferSize, scheduler);
}
#endregion
#region Time
///
/// Initializes a new instance of the class with the specified window.
///
/// Maximum time length of the replay buffer.
/// is less than TimeSpan.Zero.
public ReplaySubject(TimeSpan window)
{
_implementation = new ReplayByTime(window);
}
///
/// Initializes a new instance of the class with the specified window and scheduler.
///
/// Maximum time length of the replay buffer.
/// Scheduler the observers are invoked on.
/// is null.
/// is less than TimeSpan.Zero.
public ReplaySubject(TimeSpan window, IScheduler scheduler)
{
_implementation = new ReplayByTime(window, scheduler);
}
#endregion
#region Count & Time
///
/// Initializes a new instance of the class with the specified buffer size and window.
///
/// Maximum element count of the replay buffer.
/// Maximum time length of the replay buffer.
/// is less than zero. -or- is less than TimeSpan.Zero.
public ReplaySubject(int bufferSize, TimeSpan window)
{
_implementation = new ReplayByTime(bufferSize, window);
}
///
/// Initializes a new instance of the class with the specified buffer size, window and scheduler.
///
/// Maximum element count of the replay buffer.
/// Maximum time length of the replay buffer.
/// Scheduler the observers are invoked on.
/// is less than zero. -or- is less than TimeSpan.Zero.
/// is null.
public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
_implementation = new ReplayByTime(bufferSize, window, scheduler);
}
#endregion
#endregion
#region Properties
///
/// Indicates whether the subject has observers subscribed to it.
///
public bool HasObservers
{
get { return _implementation.HasObservers; }
}
#endregion
#region Methods
#region Observer implementation
///
/// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
///
/// The value to send to all observers.
public void OnNext(T value)
{
_implementation.OnNext(value);
}
///
/// Notifies all subscribed and future observers about the specified exception.
///
/// The exception to send to all observers.
/// is null.
public void OnError(Exception error)
{
_implementation.OnError(error);
}
///
/// Notifies all subscribed and future observers about the end of the sequence.
///
public void OnCompleted()
{
_implementation.OnCompleted();
}
#endregion
#region IObservable implementation
///
/// Subscribes an observer to the subject.
///
/// Observer to subscribe to the subject.
/// Disposable object that can be used to unsubscribe the observer from the subject.
/// is null.
public IDisposable Subscribe(IObserver observer)
{
return _implementation.Subscribe(observer);
}
#endregion
#region IDisposable implementation
///
/// Releases all resources used by the current instance of the class and unsubscribe all observers.
///
public void Dispose()
{
_implementation.Dispose();
}
#endregion
#endregion
private interface IReplaySubjectImplementation : ISubject, IDisposable
{
bool HasObservers { get; }
}
private abstract class ReplayBase : IReplaySubjectImplementation
{
private readonly object _gate = new object();
private ImmutableList> _observers;
private bool _isStopped;
private Exception _error;
private bool _isDisposed;
public ReplayBase()
{
_observers = ImmutableList>.Empty;
_isStopped = false;
_error = null;
}
public bool HasObservers
{
get
{
var observers = _observers;
return observers != null && observers.Data.Length > 0;
}
}
public void OnNext(T value)
{
var o = default(IScheduledObserver[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
Next(value);
Trim();
o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
if (o != null)
{
foreach (var observer in o)
observer.EnsureActive();
}
}
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
var o = default(IScheduledObserver[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
_isStopped = true;
_error = error;
Trim();
o = _observers.Data;
foreach (var observer in o)
observer.OnError(error);
_observers = ImmutableList>.Empty;
}
}
if (o != null)
{
foreach (var observer in o)
observer.EnsureActive();
}
}
public void OnCompleted()
{
var o = default(IScheduledObserver[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
_isStopped = true;
Trim();
o = _observers.Data;
foreach (var observer in o)
observer.OnCompleted();
_observers = ImmutableList>.Empty;
}
}
if (o != null)
{
foreach (var observer in o)
observer.EnsureActive();
}
}
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
var so = CreateScheduledObserver(observer);
var n = 0;
var subscription = new Subscription(this, so);
lock (_gate)
{
CheckDisposed();
//
// Notice the v1.x behavior of always calling Trim is preserved here.
//
// This may be subject (pun intended) of debate: should this policy
// only be applied while the sequence is active? With the current
// behavior, a sequence will "die out" after it has terminated by
// continuing to drop OnNext notifications from the queue.
//
// In v1.x, this behavior was due to trimming based on the clock value
// returned by scheduler.Now, applied to all but the terminal message
// in the queue. Using the IStopwatch has the same effect. Either way,
// we guarantee the final notification will be observed, but there's
// no way to retain the buffer directly. One approach is to use the
// time-based TakeLast operator and apply an unbounded ReplaySubject
// to it.
//
// To conclude, we're keeping the behavior as-is for compatibility
// reasons with v1.x.
//
Trim();
_observers = _observers.Add(so);
n = Replay(so);
if (_error != null)
{
n++;
so.OnError(_error);
}
else if (_isStopped)
{
n++;
so.OnCompleted();
}
}
so.EnsureActive(n);
return subscription;
}
public void Dispose()
{
lock (_gate)
{
_isDisposed = true;
_observers = null;
DisposeCore();
}
}
protected abstract void DisposeCore();
protected abstract void Next(T value);
protected abstract int Replay(IObserver observer);
protected abstract void Trim();
protected abstract IScheduledObserver CreateScheduledObserver(IObserver observer);
private void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(string.Empty);
}
private void Unsubscribe(IScheduledObserver observer)
{
lock (_gate)
{
if (!_isDisposed)
{
_observers = _observers.Remove(observer);
}
}
}
private sealed class Subscription : IDisposable
{
private readonly ReplayBase _subject;
private readonly IScheduledObserver _observer;
public Subscription(ReplayBase subject, IScheduledObserver observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
_observer.Dispose();
_subject.Unsubscribe(_observer);
}
}
}
///
/// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
///
private sealed class ReplayByTime : ReplayBase
{
private const int InfiniteBufferSize = int.MaxValue;
private readonly int _bufferSize;
private readonly TimeSpan _window;
private readonly IScheduler _scheduler;
private readonly IStopwatch _stopwatch;
private readonly Queue> _queue;
public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
{
if (bufferSize < 0)
throw new ArgumentOutOfRangeException("bufferSize");
if (window < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("window");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
_bufferSize = bufferSize;
_window = window;
_scheduler = scheduler;
_stopwatch = _scheduler.StartStopwatch();
_queue = new Queue>();
}
public ReplayByTime(int bufferSize, TimeSpan window)
: this(bufferSize, window, SchedulerDefaults.Iteration)
{
}
public ReplayByTime(IScheduler scheduler)
: this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
{
}
public ReplayByTime(int bufferSize, IScheduler scheduler)
: this(bufferSize, TimeSpan.MaxValue, scheduler)
{
}
public ReplayByTime(TimeSpan window, IScheduler scheduler)
: this(InfiniteBufferSize, window, scheduler)
{
}
public ReplayByTime(TimeSpan window)
: this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{
}
protected override IScheduledObserver CreateScheduledObserver(IObserver observer)
{
return new ScheduledObserver(_scheduler, observer);
}
protected override void DisposeCore()
{
_queue.Clear();
}
protected override void Next(T value)
{
var now = _stopwatch.Elapsed;
_queue.Enqueue(new TimeInterval(value, now));
}
protected override int Replay(IObserver observer)
{
var n = _queue.Count;
foreach (var item in _queue)
observer.OnNext(item.Value);
return n;
}
protected override void Trim()
{
var now = _stopwatch.Elapsed;
while (_queue.Count > _bufferSize)
_queue.Dequeue();
while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
_queue.Dequeue();
}
}
//
// Below are the non-time based implementations.
// These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
// The ReplayOne implementation also removes the need to even have a queue.
//
private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
{
private bool _hasValue;
private T _value;
protected override void Trim()
{
//
// No need to trim.
//
}
protected override void Next(T value)
{
_hasValue = true;
_value = value;
}
protected override int Replay(IObserver observer)
{
var n = 0;
if (_hasValue)
{
n = 1;
observer.OnNext(_value);
}
return n;
}
protected override void DisposeCore()
{
_value = default(T);
}
}
private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
{
private readonly int _bufferSize;
public ReplayMany(int bufferSize)
: base(bufferSize)
{
_bufferSize = bufferSize;
}
protected override void Trim()
{
while (_queue.Count > _bufferSize)
_queue.Dequeue();
}
}
private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
{
public ReplayAll()
: base(0)
{
}
protected override void Trim()
{
//
// Don't trim, keep all values.
//
}
}
private abstract class ReplayBufferBase : ReplayBase
{
protected override IScheduledObserver CreateScheduledObserver(IObserver observer)
{
return new FastImmediateObserver(observer);
}
protected override void DisposeCore()
{
}
}
private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
{
protected readonly Queue _queue;
protected ReplayManyBase(int queueSize)
: base()
{
_queue = new Queue(Math.Min(queueSize, 64));
}
protected override void Next(T value)
{
_queue.Enqueue(value);
}
protected override int Replay(IObserver observer)
{
var n = _queue.Count;
foreach (var item in _queue)
observer.OnNext(item);
return n;
}
protected override void DisposeCore()
{
_queue.Clear();
}
}
}
///
/// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
///
/// Type of the elements processed by the observer.
class FastImmediateObserver : IScheduledObserver
{
///
/// Gate to control ownership transfer and protect data structures.
///
private readonly object _gate = new object();
///
/// Observer to forward notifications to.
///
private volatile IObserver _observer;
///
/// Queue to enqueue OnNext notifications into.
///
private Queue _queue = new Queue();
///
/// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
/// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
///
private Queue _queue2;
///
/// Exception passed to an OnError notification, if any.
///
private Exception _error;
///
/// Indicates whether an OnCompleted notification was received.
///
private bool _done;
///
/// Indicates whether the observer is busy, i.e. some thread is actively draining the
/// notifications that were queued up.
///
private bool _busy;
///
/// Indicates whether a failure occurred when the owner was draining the queue. This will
/// prevent future work to be processed.
///
private bool _hasFaulted;
///
/// Creates a new scheduled observer that proxies to the specified observer.
///
/// Observer to forward notifications to.
public FastImmediateObserver(IObserver observer)
{
_observer = observer;
}
///
/// Disposes the observer.
///
public void Dispose()
{
Done();
}
///
/// Notifies the observer of pending work. This will either cause the current owner to
/// process the newly enqueued notifications, or it will cause the calling thread to
/// become the owner and start processing the notification queue.
///
public void EnsureActive()
{
EnsureActive(1);
}
///
/// Notifies the observer of pending work. This will either cause the current owner to
/// process the newly enqueued notifications, or it will cause the calling thread to
/// become the owner and start processing the notification queue.
///
/// The number of enqueued notifications to process (ignored).
public void EnsureActive(int count)
{
var isOwner = false;
lock (_gate)
{
//
// If we failed to process work in the past, we'll simply drop it.
//
if (!_hasFaulted)
{
//
// If no-one is processing the notification queue, become the owner.
//
if (!_busy)
{
isOwner = true;
_busy = true;
}
}
}
if (isOwner)
{
while (true)
{
var queue = default(Queue);
var error = default(Exception);
var done = false;
//
// Steal notifications from the producer side to drain them to the observer.
//
lock (_gate)
{
//
// Do we have any OnNext notifications to process?
//
if (_queue.Count > 0)
{
if (_queue2 == null)
{
_queue2 = new Queue();
}
//
// Swap out the current queue for a fresh or recycled one. The standby
// queue is set to null; when notifications are sent out the processed
// queue will become the new standby.
//
queue = _queue;
_queue = _queue2;
_queue2 = null;
}
//
// Do we have any terminal notifications to process?
//
if (_error != null)
{
error = _error;
}
else if (_done)
{
done = true;
}
else if (queue == null)
{
//
// No work left; quit the loop and let another thread become the
// owner in the future.
//
_busy = false;
break;
}
}
try
{
//
// Process OnNext notifications, if any.
//
if (queue != null)
{
//
// Drain the stolen OnNext notification queue.
//
while (queue.Count > 0)
{
_observer.OnNext(queue.Dequeue());
}
//
// The queue is now empty, so we can reuse it by making it the standby
// queue for a future swap.
//
lock (_gate)
{
_queue2 = queue;
}
}
//
// Process terminal notifications, if any. Notice we don't release ownership
// after processing these notifications; we simply quit from the loop. This
// will cause all processing of the scheduler observer to cease.
//
if (error != null)
{
var observer = Done();
observer.OnError(error);
break;
}
else if (done)
{
var observer = Done();
observer.OnCompleted();
break;
}
}
catch
{
lock (_gate)
{
_hasFaulted = true;
_queue.Clear();
}
throw;
}
}
}
}
///
/// Enqueues an OnCompleted notification.
///
public void OnCompleted()
{
lock (_gate)
{
if (!_hasFaulted)
{
_done = true;
}
}
}
///
/// Enqueues an OnError notification.
///
/// Error of the notification.
public void OnError(Exception error)
{
lock (_gate)
{
if (!_hasFaulted)
{
_error = error;
}
}
}
///
/// Enqueues an OnNext notification.
///
/// Value of the notification.
public void OnNext(T value)
{
lock (_gate)
{
if (!_hasFaulted)
{
_queue.Enqueue(value);
}
}
}
///
/// Terminates the observer upon receiving terminal notifications, thus preventing
/// future notifications to go out.
///
/// Observer to send terminal notifications to.
private IObserver Done()
{
return Interlocked.Exchange(ref _observer, NopObserver.Instance);
}
}
}