// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; using System.Reactive.Concurrency; using System.Threading; namespace System.Reactive.Subjects { /// /// Represents an object that is both an observable sequence as well as an observer. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies. /// /// The type of the elements processed by the subject. public sealed class ReplaySubject : ISubject, IDisposable { #region Fields /// /// Underlying optimized implementation of the replay subject. /// private readonly IReplaySubjectImplementation _implementation; #endregion #region Constructors #region All /// /// Initializes a new instance of the class. /// public ReplaySubject() : this(int.MaxValue) { } /// /// Initializes a new instance of the class with the specified scheduler. /// /// Scheduler the observers are invoked on. /// is null. public ReplaySubject(IScheduler scheduler) { _implementation = new ReplayByTime(scheduler); } #endregion #region Count /// /// Initializes a new instance of the class with the specified buffer size. /// /// Maximum element count of the replay buffer. /// is less than zero. public ReplaySubject(int bufferSize) { switch (bufferSize) { case 1: _implementation = new ReplayOne(); break; case int.MaxValue: _implementation = new ReplayAll(); break; default: _implementation = new ReplayMany(bufferSize); break; } } /// /// Initializes a new instance of the class with the specified buffer size and scheduler. /// /// Maximum element count of the replay buffer. /// Scheduler the observers are invoked on. /// is null. /// is less than zero. public ReplaySubject(int bufferSize, IScheduler scheduler) { _implementation = new ReplayByTime(bufferSize, scheduler); } #endregion #region Time /// /// Initializes a new instance of the class with the specified window. /// /// Maximum time length of the replay buffer. /// is less than TimeSpan.Zero. public ReplaySubject(TimeSpan window) { _implementation = new ReplayByTime(window); } /// /// Initializes a new instance of the class with the specified window and scheduler. /// /// Maximum time length of the replay buffer. /// Scheduler the observers are invoked on. /// is null. /// is less than TimeSpan.Zero. public ReplaySubject(TimeSpan window, IScheduler scheduler) { _implementation = new ReplayByTime(window, scheduler); } #endregion #region Count & Time /// /// Initializes a new instance of the class with the specified buffer size and window. /// /// Maximum element count of the replay buffer. /// Maximum time length of the replay buffer. /// is less than zero. -or- is less than TimeSpan.Zero. public ReplaySubject(int bufferSize, TimeSpan window) { _implementation = new ReplayByTime(bufferSize, window); } /// /// Initializes a new instance of the class with the specified buffer size, window and scheduler. /// /// Maximum element count of the replay buffer. /// Maximum time length of the replay buffer. /// Scheduler the observers are invoked on. /// is less than zero. -or- is less than TimeSpan.Zero. /// is null. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler) { _implementation = new ReplayByTime(bufferSize, window, scheduler); } #endregion #endregion #region Properties /// /// Indicates whether the subject has observers subscribed to it. /// public bool HasObservers { get { return _implementation.HasObservers; } } #endregion #region Methods #region Observer implementation /// /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence. /// /// The value to send to all observers. public void OnNext(T value) { _implementation.OnNext(value); } /// /// Notifies all subscribed and future observers about the specified exception. /// /// The exception to send to all observers. /// is null. public void OnError(Exception error) { _implementation.OnError(error); } /// /// Notifies all subscribed and future observers about the end of the sequence. /// public void OnCompleted() { _implementation.OnCompleted(); } #endregion #region IObservable implementation /// /// Subscribes an observer to the subject. /// /// Observer to subscribe to the subject. /// Disposable object that can be used to unsubscribe the observer from the subject. /// is null. public IDisposable Subscribe(IObserver observer) { return _implementation.Subscribe(observer); } #endregion #region IDisposable implementation /// /// Releases all resources used by the current instance of the class and unsubscribe all observers. /// public void Dispose() { _implementation.Dispose(); } #endregion #endregion private interface IReplaySubjectImplementation : ISubject, IDisposable { bool HasObservers { get; } } private abstract class ReplayBase : IReplaySubjectImplementation { private readonly object _gate = new object(); private ImmutableList> _observers; private bool _isStopped; private Exception _error; private bool _isDisposed; public ReplayBase() { _observers = ImmutableList>.Empty; _isStopped = false; _error = null; } public bool HasObservers { get { var observers = _observers; return observers != null && observers.Data.Length > 0; } } public void OnNext(T value) { var o = default(IScheduledObserver[]); lock (_gate) { CheckDisposed(); if (!_isStopped) { Next(value); Trim(); o = _observers.Data; foreach (var observer in o) observer.OnNext(value); } } if (o != null) { foreach (var observer in o) observer.EnsureActive(); } } public void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); var o = default(IScheduledObserver[]); lock (_gate) { CheckDisposed(); if (!_isStopped) { _isStopped = true; _error = error; Trim(); o = _observers.Data; foreach (var observer in o) observer.OnError(error); _observers = ImmutableList>.Empty; } } if (o != null) { foreach (var observer in o) observer.EnsureActive(); } } public void OnCompleted() { var o = default(IScheduledObserver[]); lock (_gate) { CheckDisposed(); if (!_isStopped) { _isStopped = true; Trim(); o = _observers.Data; foreach (var observer in o) observer.OnCompleted(); _observers = ImmutableList>.Empty; } } if (o != null) { foreach (var observer in o) observer.EnsureActive(); } } public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); var so = CreateScheduledObserver(observer); var n = 0; var subscription = new Subscription(this, so); lock (_gate) { CheckDisposed(); // // Notice the v1.x behavior of always calling Trim is preserved here. // // This may be subject (pun intended) of debate: should this policy // only be applied while the sequence is active? With the current // behavior, a sequence will "die out" after it has terminated by // continuing to drop OnNext notifications from the queue. // // In v1.x, this behavior was due to trimming based on the clock value // returned by scheduler.Now, applied to all but the terminal message // in the queue. Using the IStopwatch has the same effect. Either way, // we guarantee the final notification will be observed, but there's // no way to retain the buffer directly. One approach is to use the // time-based TakeLast operator and apply an unbounded ReplaySubject // to it. // // To conclude, we're keeping the behavior as-is for compatibility // reasons with v1.x. // Trim(); _observers = _observers.Add(so); n = Replay(so); if (_error != null) { n++; so.OnError(_error); } else if (_isStopped) { n++; so.OnCompleted(); } } so.EnsureActive(n); return subscription; } public void Dispose() { lock (_gate) { _isDisposed = true; _observers = null; DisposeCore(); } } protected abstract void DisposeCore(); protected abstract void Next(T value); protected abstract int Replay(IObserver observer); protected abstract void Trim(); protected abstract IScheduledObserver CreateScheduledObserver(IObserver observer); private void CheckDisposed() { if (_isDisposed) throw new ObjectDisposedException(string.Empty); } private void Unsubscribe(IScheduledObserver observer) { lock (_gate) { if (!_isDisposed) { _observers = _observers.Remove(observer); } } } private sealed class Subscription : IDisposable { private readonly ReplayBase _subject; private readonly IScheduledObserver _observer; public Subscription(ReplayBase subject, IScheduledObserver observer) { _subject = subject; _observer = observer; } public void Dispose() { _observer.Dispose(); _subject.Unsubscribe(_observer); } } } /// /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time). /// private sealed class ReplayByTime : ReplayBase { private const int InfiniteBufferSize = int.MaxValue; private readonly int _bufferSize; private readonly TimeSpan _window; private readonly IScheduler _scheduler; private readonly IStopwatch _stopwatch; private readonly Queue> _queue; public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler) { if (bufferSize < 0) throw new ArgumentOutOfRangeException("bufferSize"); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException("window"); if (scheduler == null) throw new ArgumentNullException("scheduler"); _bufferSize = bufferSize; _window = window; _scheduler = scheduler; _stopwatch = _scheduler.StartStopwatch(); _queue = new Queue>(); } public ReplayByTime(int bufferSize, TimeSpan window) : this(bufferSize, window, SchedulerDefaults.Iteration) { } public ReplayByTime(IScheduler scheduler) : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler) { } public ReplayByTime(int bufferSize, IScheduler scheduler) : this(bufferSize, TimeSpan.MaxValue, scheduler) { } public ReplayByTime(TimeSpan window, IScheduler scheduler) : this(InfiniteBufferSize, window, scheduler) { } public ReplayByTime(TimeSpan window) : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration) { } protected override IScheduledObserver CreateScheduledObserver(IObserver observer) { return new ScheduledObserver(_scheduler, observer); } protected override void DisposeCore() { _queue.Clear(); } protected override void Next(T value) { var now = _stopwatch.Elapsed; _queue.Enqueue(new TimeInterval(value, now)); } protected override int Replay(IObserver observer) { var n = _queue.Count; foreach (var item in _queue) observer.OnNext(item.Value); return n; } protected override void Trim() { var now = _stopwatch.Elapsed; while (_queue.Count > _bufferSize) _queue.Dequeue(); while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0) _queue.Dequeue(); } } // // Below are the non-time based implementations. // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action. // The ReplayOne implementation also removes the need to even have a queue. // private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation { private bool _hasValue; private T _value; protected override void Trim() { // // No need to trim. // } protected override void Next(T value) { _hasValue = true; _value = value; } protected override int Replay(IObserver observer) { var n = 0; if (_hasValue) { n = 1; observer.OnNext(_value); } return n; } protected override void DisposeCore() { _value = default(T); } } private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation { private readonly int _bufferSize; public ReplayMany(int bufferSize) : base(bufferSize) { _bufferSize = bufferSize; } protected override void Trim() { while (_queue.Count > _bufferSize) _queue.Dequeue(); } } private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation { public ReplayAll() : base(0) { } protected override void Trim() { // // Don't trim, keep all values. // } } private abstract class ReplayBufferBase : ReplayBase { protected override IScheduledObserver CreateScheduledObserver(IObserver observer) { return new FastImmediateObserver(observer); } protected override void DisposeCore() { } } private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation { protected readonly Queue _queue; protected ReplayManyBase(int queueSize) : base() { _queue = new Queue(Math.Min(queueSize, 64)); } protected override void Next(T value) { _queue.Enqueue(value); } protected override int Replay(IObserver observer) { var n = _queue.Count; foreach (var item in _queue) observer.OnNext(item); return n; } protected override void DisposeCore() { _queue.Clear(); } } } /// /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler. /// /// Type of the elements processed by the observer. class FastImmediateObserver : IScheduledObserver { /// /// Gate to control ownership transfer and protect data structures. /// private readonly object _gate = new object(); /// /// Observer to forward notifications to. /// private volatile IObserver _observer; /// /// Queue to enqueue OnNext notifications into. /// private Queue _queue = new Queue(); /// /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse /// queues in case of busy subjects where the initial replay doesn't suffice to catch up. /// private Queue _queue2; /// /// Exception passed to an OnError notification, if any. /// private Exception _error; /// /// Indicates whether an OnCompleted notification was received. /// private bool _done; /// /// Indicates whether the observer is busy, i.e. some thread is actively draining the /// notifications that were queued up. /// private bool _busy; /// /// Indicates whether a failure occurred when the owner was draining the queue. This will /// prevent future work to be processed. /// private bool _hasFaulted; /// /// Creates a new scheduled observer that proxies to the specified observer. /// /// Observer to forward notifications to. public FastImmediateObserver(IObserver observer) { _observer = observer; } /// /// Disposes the observer. /// public void Dispose() { Done(); } /// /// Notifies the observer of pending work. This will either cause the current owner to /// process the newly enqueued notifications, or it will cause the calling thread to /// become the owner and start processing the notification queue. /// public void EnsureActive() { EnsureActive(1); } /// /// Notifies the observer of pending work. This will either cause the current owner to /// process the newly enqueued notifications, or it will cause the calling thread to /// become the owner and start processing the notification queue. /// /// The number of enqueued notifications to process (ignored). public void EnsureActive(int count) { var isOwner = false; lock (_gate) { // // If we failed to process work in the past, we'll simply drop it. // if (!_hasFaulted) { // // If no-one is processing the notification queue, become the owner. // if (!_busy) { isOwner = true; _busy = true; } } } if (isOwner) { while (true) { var queue = default(Queue); var error = default(Exception); var done = false; // // Steal notifications from the producer side to drain them to the observer. // lock (_gate) { // // Do we have any OnNext notifications to process? // if (_queue.Count > 0) { if (_queue2 == null) { _queue2 = new Queue(); } // // Swap out the current queue for a fresh or recycled one. The standby // queue is set to null; when notifications are sent out the processed // queue will become the new standby. // queue = _queue; _queue = _queue2; _queue2 = null; } // // Do we have any terminal notifications to process? // if (_error != null) { error = _error; } else if (_done) { done = true; } else if (queue == null) { // // No work left; quit the loop and let another thread become the // owner in the future. // _busy = false; break; } } try { // // Process OnNext notifications, if any. // if (queue != null) { // // Drain the stolen OnNext notification queue. // while (queue.Count > 0) { _observer.OnNext(queue.Dequeue()); } // // The queue is now empty, so we can reuse it by making it the standby // queue for a future swap. // lock (_gate) { _queue2 = queue; } } // // Process terminal notifications, if any. Notice we don't release ownership // after processing these notifications; we simply quit from the loop. This // will cause all processing of the scheduler observer to cease. // if (error != null) { var observer = Done(); observer.OnError(error); break; } else if (done) { var observer = Done(); observer.OnCompleted(); break; } } catch { lock (_gate) { _hasFaulted = true; _queue.Clear(); } throw; } } } } /// /// Enqueues an OnCompleted notification. /// public void OnCompleted() { lock (_gate) { if (!_hasFaulted) { _done = true; } } } /// /// Enqueues an OnError notification. /// /// Error of the notification. public void OnError(Exception error) { lock (_gate) { if (!_hasFaulted) { _error = error; } } } /// /// Enqueues an OnNext notification. /// /// Value of the notification. public void OnNext(T value) { lock (_gate) { if (!_hasFaulted) { _queue.Enqueue(value); } } } /// /// Terminates the observer upon receiving terminal notifications, thus preventing /// future notifications to go out. /// /// Observer to send terminal notifications to. private IObserver Done() { return Interlocked.Exchange(ref _observer, NopObserver.Instance); } } }