// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Threading;
namespace System.Reactive.Subjects
{
///
/// Represents an object that is both an observable sequence as well as an observer.
/// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
///
/// The type of the elements processed by the subject.
public sealed class ReplaySubject : ISubject, IDisposable
{
#region Fields
///
/// Underlying optimized implementation of the replay subject.
///
private readonly IReplaySubjectImplementation _implementation;
#endregion
#region Constructors
#region All
///
/// Initializes a new instance of the class.
///
public ReplaySubject()
{
_implementation = new ReplayAll();
}
///
/// Initializes a new instance of the class with the specified scheduler.
///
/// Scheduler the observers are invoked on.
/// is null.
public ReplaySubject(IScheduler scheduler)
{
_implementation = new ReplayByTime(scheduler);
}
#endregion
#region Count
///
/// Initializes a new instance of the class with the specified buffer size.
///
/// Maximum element count of the replay buffer.
/// is less than zero.
public ReplaySubject(int bufferSize)
{
switch (bufferSize)
{
case 1:
_implementation = new ReplayOne();
break;
case int.MaxValue:
_implementation = new ReplayAll();
break;
default:
_implementation = new ReplayMany(bufferSize);
break;
}
}
///
/// Initializes a new instance of the class with the specified buffer size and scheduler.
///
/// Maximum element count of the replay buffer.
/// Scheduler the observers are invoked on.
/// is null.
/// is less than zero.
public ReplaySubject(int bufferSize, IScheduler scheduler)
{
_implementation = new ReplayByTime(bufferSize, scheduler);
}
#endregion
#region Time
///
/// Initializes a new instance of the class with the specified window.
///
/// Maximum time length of the replay buffer.
/// is less than TimeSpan.Zero.
public ReplaySubject(TimeSpan window)
{
_implementation = new ReplayByTime(window);
}
///
/// Initializes a new instance of the class with the specified window and scheduler.
///
/// Maximum time length of the replay buffer.
/// Scheduler the observers are invoked on.
/// is null.
/// is less than TimeSpan.Zero.
public ReplaySubject(TimeSpan window, IScheduler scheduler)
{
_implementation = new ReplayByTime(window, scheduler);
}
#endregion
#region Count & Time
///
/// Initializes a new instance of the class with the specified buffer size and window.
///
/// Maximum element count of the replay buffer.
/// Maximum time length of the replay buffer.
/// is less than zero. -or- is less than TimeSpan.Zero.
public ReplaySubject(int bufferSize, TimeSpan window)
{
_implementation = new ReplayByTime(bufferSize, window);
}
///
/// Initializes a new instance of the class with the specified buffer size, window and scheduler.
///
/// Maximum element count of the replay buffer.
/// Maximum time length of the replay buffer.
/// Scheduler the observers are invoked on.
/// is less than zero. -or- is less than TimeSpan.Zero.
/// is null.
public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
_implementation = new ReplayByTime(bufferSize, window, scheduler);
}
#endregion
#endregion
#region Properties
///
/// Indicates whether the subject has observers subscribed to it.
///
public bool HasObservers
{
get { return _implementation.HasObservers; }
}
#endregion
#region Methods
#region Observer implementation
///
/// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
///
/// The value to send to all observers.
public void OnNext(T value)
{
_implementation.OnNext(value);
}
///
/// Notifies all subscribed and future observers about the specified exception.
///
/// The exception to send to all observers.
/// is null.
public void OnError(Exception error)
{
_implementation.OnError(error);
}
///
/// Notifies all subscribed and future observers about the end of the sequence.
///
public void OnCompleted()
{
_implementation.OnCompleted();
}
#endregion
#region IObservable implementation
///
/// Subscribes an observer to the subject.
///
/// Observer to subscribe to the subject.
/// Disposable object that can be used to unsubscribe the observer from the subject.
/// is null.
public IDisposable Subscribe(IObserver observer)
{
return _implementation.Subscribe(observer);
}
#endregion
#region IDisposable implementation
///
/// Releases all resources used by the current instance of the class and unsubscribe all observers.
///
public void Dispose()
{
_implementation.Dispose();
}
#endregion
#endregion
private interface IReplaySubjectImplementation : ISubject, IDisposable
{
bool HasObservers { get; }
void Unsubscribe(IObserver observer);
}
private class Subscription : IDisposable
{
private IReplaySubjectImplementation _subject;
private IObserver _observer;
public Subscription(IReplaySubjectImplementation subject, IObserver observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
var observer = Interlocked.Exchange(ref _observer, null);
if (observer == null)
return;
_subject.Unsubscribe(observer);
_subject = null;
}
}
///
/// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
///
private sealed class ReplayByTime : IReplaySubjectImplementation
{
private const int InfiniteBufferSize = int.MaxValue;
private readonly int _bufferSize;
private readonly TimeSpan _window;
private readonly IScheduler _scheduler;
private readonly IStopwatch _stopwatch;
private readonly Queue> _queue;
private bool _isStopped;
private Exception _error;
private ImmutableList> _observers;
private bool _isDisposed;
private readonly object _gate = new object();
public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
{
if (bufferSize < 0)
throw new ArgumentOutOfRangeException("bufferSize");
if (window < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("window");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
_bufferSize = bufferSize;
_window = window;
_scheduler = scheduler;
_stopwatch = _scheduler.StartStopwatch();
_queue = new Queue>();
_isStopped = false;
_error = null;
_observers = ImmutableList>.Empty;
}
public ReplayByTime(int bufferSize, TimeSpan window)
: this(bufferSize, window, SchedulerDefaults.Iteration)
{
}
public ReplayByTime(IScheduler scheduler)
: this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
{
}
public ReplayByTime(int bufferSize, IScheduler scheduler)
: this(bufferSize, TimeSpan.MaxValue, scheduler)
{
}
public ReplayByTime(TimeSpan window, IScheduler scheduler)
: this(InfiniteBufferSize, window, scheduler)
{
}
public ReplayByTime(TimeSpan window)
: this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{
}
public bool HasObservers
{
get
{
var observers = _observers;
return observers != null && observers.Data.Length > 0;
}
}
private void Trim(TimeSpan now)
{
while (_queue.Count > _bufferSize)
_queue.Dequeue();
while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
_queue.Dequeue();
}
public void OnNext(T value)
{
var o = default(IScheduledObserver[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
var now = _stopwatch.Elapsed;
_queue.Enqueue(new TimeInterval(value, now));
Trim(now);
o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
if (o != null)
foreach (var observer in o)
observer.EnsureActive();
}
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
var o = default(IScheduledObserver[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
var now = _stopwatch.Elapsed;
_isStopped = true;
_error = error;
Trim(now);
o = _observers.Data;
foreach (var observer in o)
observer.OnError(error);
_observers = ImmutableList>.Empty;
}
}
if (o != null)
foreach (var observer in o)
observer.EnsureActive();
}
public void OnCompleted()
{
var o = default(IScheduledObserver[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
var now = _stopwatch.Elapsed;
_isStopped = true;
Trim(now);
o = _observers.Data;
foreach (var observer in o)
observer.OnCompleted();
_observers = ImmutableList>.Empty;
}
}
if (o != null)
foreach (var observer in o)
observer.EnsureActive();
}
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
var so = new ScheduledObserver(_scheduler, observer);
var n = 0;
var subscription = new RemovableDisposable(this, so);
lock (_gate)
{
CheckDisposed();
//
// Notice the v1.x behavior of always calling Trim is preserved here.
//
// This may be subject (pun intended) of debate: should this policy
// only be applied while the sequence is active? With the current
// behavior, a sequence will "die out" after it has terminated by
// continuing to drop OnNext notifications from the queue.
//
// In v1.x, this behavior was due to trimming based on the clock value
// returned by scheduler.Now, applied to all but the terminal message
// in the queue. Using the IStopwatch has the same effect. Either way,
// we guarantee the final notification will be observed, but there's
// no way to retain the buffer directly. One approach is to use the
// time-based TakeLast operator and apply an unbounded ReplaySubject
// to it.
//
// To conclude, we're keeping the behavior as-is for compatibility
// reasons with v1.x.
//
Trim(_stopwatch.Elapsed);
_observers = _observers.Add(so);
n = _queue.Count;
foreach (var item in _queue)
so.OnNext(item.Value);
if (_error != null)
{
n++;
so.OnError(_error);
}
else if (_isStopped)
{
n++;
so.OnCompleted();
}
}
so.EnsureActive(n);
return subscription;
}
private void Unsubscribe(IScheduledObserver observer)
{
lock (_gate)
{
observer.Dispose();
if (!_isDisposed)
{
_observers = _observers.Remove(observer);
}
}
}
void IReplaySubjectImplementation.Unsubscribe(IObserver observer)
{
var so = (IScheduledObserver)observer;
Unsubscribe(so);
}
sealed class RemovableDisposable : IDisposable
{
private readonly ReplayByTime _subject;
private readonly IScheduledObserver _observer;
public RemovableDisposable(ReplayByTime subject, IScheduledObserver observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
_observer.Dispose();
_subject.Unsubscribe(_observer);
}
}
private void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(string.Empty);
}
public void Dispose()
{
lock (_gate)
{
_isDisposed = true;
_observers = null;
_queue.Clear();
}
}
}
//
// Below are the non-time based implementations.
// These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
// The ReplayOne implementation also removes the need to even have a queue.
//
private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
{
private bool _hasValue;
private T _value;
protected override void Trim()
{
//
// No need to trim.
//
}
protected override void AddValueToBuffer(T value)
{
_hasValue = true;
_value = value;
}
protected override void ReplayBuffer(IObserver observer)
{
if (_hasValue)
observer.OnNext(_value);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_value = default(T);
}
}
private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
{
private readonly int _bufferSize;
public ReplayMany(int bufferSize)
: base(bufferSize)
{
_bufferSize = bufferSize;
}
protected override void Trim()
{
while (Queue.Count > _bufferSize)
Queue.Dequeue();
}
}
private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
{
public ReplayAll()
: base(0)
{
}
protected override void Trim()
{
//
// Don't trim, keep all values.
//
}
}
private abstract class ReplayBufferBase : IReplaySubjectImplementation
{
private readonly object _gate = new object();
private bool _isDisposed;
private bool _isStopped;
private Exception _error;
private ImmutableList> _observers;
protected ReplayBufferBase()
{
_observers = ImmutableList>.Empty;
}
protected abstract void Trim();
protected abstract void AddValueToBuffer(T value);
protected abstract void ReplayBuffer(IObserver observer);
public bool HasObservers
{
get
{
var observers = _observers;
return observers != null && observers.Data.Length > 0;
}
}
public void OnNext(T value)
{
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
AddValueToBuffer(value);
Trim();
var o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
}
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
_isStopped = true;
_error = error;
Trim();
var o = _observers.Data;
foreach (var observer in o)
observer.OnError(error);
_observers = ImmutableList>.Empty;
}
}
}
public void OnCompleted()
{
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
_isStopped = true;
Trim();
var o = _observers.Data;
foreach (var observer in o)
observer.OnCompleted();
_observers = ImmutableList>.Empty;
}
}
}
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
var subscription = new Subscription(this, observer);
lock (_gate)
{
CheckDisposed();
//
// Notice the v1.x behavior of always calling Trim is preserved here.
//
// This may be subject (pun intended) of debate: should this policy
// only be applied while the sequence is active? With the current
// behavior, a sequence will "die out" after it has terminated by
// continuing to drop OnNext notifications from the queue.
//
// In v1.x, this behavior was due to trimming based on the clock value
// returned by scheduler.Now, applied to all but the terminal message
// in the queue. Using the IStopwatch has the same effect. Either way,
// we guarantee the final notification will be observed, but there's
// no way to retain the buffer directly. One approach is to use the
// time-based TakeLast operator and apply an unbounded ReplaySubject
// to it.
//
// To conclude, we're keeping the behavior as-is for compatibility
// reasons with v1.x.
//
Trim();
_observers = _observers.Add(observer);
ReplayBuffer(observer);
if (_error != null)
{
observer.OnError(_error);
}
else if (_isStopped)
{
observer.OnCompleted();
}
}
return subscription;
}
public void Unsubscribe(IObserver observer)
{
lock (_gate)
{
if (!_isDisposed)
{
_observers = _observers.Remove(observer);
}
}
}
private void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(string.Empty);
}
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
lock (_gate)
{
_isDisposed = true;
_observers = null;
}
}
}
private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
{
private readonly Queue _queue;
protected ReplayManyBase(int queueSize)
: base()
{
_queue = new Queue(queueSize);
}
protected Queue Queue
{
get
{
return _queue;
}
}
protected override void AddValueToBuffer(T value)
{
_queue.Enqueue(value);
}
protected override void ReplayBuffer(IObserver observer)
{
foreach (var item in _queue)
observer.OnNext(item);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_queue.Clear();
}
}
}
}