// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; using System.Reactive.Concurrency; using System.Threading; namespace System.Reactive.Subjects { /// /// Represents an object that is both an observable sequence as well as an observer. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies. /// /// The type of the elements processed by the subject. public sealed class ReplaySubject : ISubject, IDisposable { #region Fields /// /// Underlying optimized implementation of the replay subject. /// private readonly IReplaySubjectImplementation _implementation; #endregion #region Constructors #region All /// /// Initializes a new instance of the class. /// public ReplaySubject() { _implementation = new ReplayAll(); } /// /// Initializes a new instance of the class with the specified scheduler. /// /// Scheduler the observers are invoked on. /// is null. public ReplaySubject(IScheduler scheduler) { _implementation = new ReplayByTime(scheduler); } #endregion #region Count /// /// Initializes a new instance of the class with the specified buffer size. /// /// Maximum element count of the replay buffer. /// is less than zero. public ReplaySubject(int bufferSize) { switch (bufferSize) { case 1: _implementation = new ReplayOne(); break; case int.MaxValue: _implementation = new ReplayAll(); break; default: _implementation = new ReplayMany(bufferSize); break; } } /// /// Initializes a new instance of the class with the specified buffer size and scheduler. /// /// Maximum element count of the replay buffer. /// Scheduler the observers are invoked on. /// is null. /// is less than zero. public ReplaySubject(int bufferSize, IScheduler scheduler) { _implementation = new ReplayByTime(bufferSize, scheduler); } #endregion #region Time /// /// Initializes a new instance of the class with the specified window. /// /// Maximum time length of the replay buffer. /// is less than TimeSpan.Zero. public ReplaySubject(TimeSpan window) { _implementation = new ReplayByTime(window); } /// /// Initializes a new instance of the class with the specified window and scheduler. /// /// Maximum time length of the replay buffer. /// Scheduler the observers are invoked on. /// is null. /// is less than TimeSpan.Zero. public ReplaySubject(TimeSpan window, IScheduler scheduler) { _implementation = new ReplayByTime(window, scheduler); } #endregion #region Count & Time /// /// Initializes a new instance of the class with the specified buffer size and window. /// /// Maximum element count of the replay buffer. /// Maximum time length of the replay buffer. /// is less than zero. -or- is less than TimeSpan.Zero. public ReplaySubject(int bufferSize, TimeSpan window) { _implementation = new ReplayByTime(bufferSize, window); } /// /// Initializes a new instance of the class with the specified buffer size, window and scheduler. /// /// Maximum element count of the replay buffer. /// Maximum time length of the replay buffer. /// Scheduler the observers are invoked on. /// is less than zero. -or- is less than TimeSpan.Zero. /// is null. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler) { _implementation = new ReplayByTime(bufferSize, window, scheduler); } #endregion #endregion #region Properties /// /// Indicates whether the subject has observers subscribed to it. /// public bool HasObservers { get { return _implementation.HasObservers; } } #endregion #region Methods #region Observer implementation /// /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence. /// /// The value to send to all observers. public void OnNext(T value) { _implementation.OnNext(value); } /// /// Notifies all subscribed and future observers about the specified exception. /// /// The exception to send to all observers. /// is null. public void OnError(Exception error) { _implementation.OnError(error); } /// /// Notifies all subscribed and future observers about the end of the sequence. /// public void OnCompleted() { _implementation.OnCompleted(); } #endregion #region IObservable implementation /// /// Subscribes an observer to the subject. /// /// Observer to subscribe to the subject. /// Disposable object that can be used to unsubscribe the observer from the subject. /// is null. public IDisposable Subscribe(IObserver observer) { return _implementation.Subscribe(observer); } #endregion #region IDisposable implementation /// /// Releases all resources used by the current instance of the class and unsubscribe all observers. /// public void Dispose() { _implementation.Dispose(); } #endregion #endregion private interface IReplaySubjectImplementation : ISubject, IDisposable { bool HasObservers { get; } void Unsubscribe(IObserver observer); } private class Subscription : IDisposable { private IReplaySubjectImplementation _subject; private IObserver _observer; public Subscription(IReplaySubjectImplementation subject, IObserver observer) { _subject = subject; _observer = observer; } public void Dispose() { var observer = Interlocked.Exchange(ref _observer, null); if (observer == null) return; _subject.Unsubscribe(observer); _subject = null; } } /// /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time). /// private sealed class ReplayByTime : IReplaySubjectImplementation { private const int InfiniteBufferSize = int.MaxValue; private readonly int _bufferSize; private readonly TimeSpan _window; private readonly IScheduler _scheduler; private readonly IStopwatch _stopwatch; private readonly Queue> _queue; private bool _isStopped; private Exception _error; private ImmutableList> _observers; private bool _isDisposed; private readonly object _gate = new object(); public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler) { if (bufferSize < 0) throw new ArgumentOutOfRangeException("bufferSize"); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException("window"); if (scheduler == null) throw new ArgumentNullException("scheduler"); _bufferSize = bufferSize; _window = window; _scheduler = scheduler; _stopwatch = _scheduler.StartStopwatch(); _queue = new Queue>(); _isStopped = false; _error = null; _observers = ImmutableList>.Empty; } public ReplayByTime(int bufferSize, TimeSpan window) : this(bufferSize, window, SchedulerDefaults.Iteration) { } public ReplayByTime(IScheduler scheduler) : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler) { } public ReplayByTime(int bufferSize, IScheduler scheduler) : this(bufferSize, TimeSpan.MaxValue, scheduler) { } public ReplayByTime(TimeSpan window, IScheduler scheduler) : this(InfiniteBufferSize, window, scheduler) { } public ReplayByTime(TimeSpan window) : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration) { } public bool HasObservers { get { var observers = _observers; return observers != null && observers.Data.Length > 0; } } private void Trim(TimeSpan now) { while (_queue.Count > _bufferSize) _queue.Dequeue(); while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0) _queue.Dequeue(); } public void OnNext(T value) { var o = default(IScheduledObserver[]); lock (_gate) { CheckDisposed(); if (!_isStopped) { var now = _stopwatch.Elapsed; _queue.Enqueue(new TimeInterval(value, now)); Trim(now); o = _observers.Data; foreach (var observer in o) observer.OnNext(value); } } if (o != null) foreach (var observer in o) observer.EnsureActive(); } public void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); var o = default(IScheduledObserver[]); lock (_gate) { CheckDisposed(); if (!_isStopped) { var now = _stopwatch.Elapsed; _isStopped = true; _error = error; Trim(now); o = _observers.Data; foreach (var observer in o) observer.OnError(error); _observers = ImmutableList>.Empty; } } if (o != null) foreach (var observer in o) observer.EnsureActive(); } public void OnCompleted() { var o = default(IScheduledObserver[]); lock (_gate) { CheckDisposed(); if (!_isStopped) { var now = _stopwatch.Elapsed; _isStopped = true; Trim(now); o = _observers.Data; foreach (var observer in o) observer.OnCompleted(); _observers = ImmutableList>.Empty; } } if (o != null) foreach (var observer in o) observer.EnsureActive(); } public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); var so = new ScheduledObserver(_scheduler, observer); var n = 0; var subscription = new RemovableDisposable(this, so); lock (_gate) { CheckDisposed(); // // Notice the v1.x behavior of always calling Trim is preserved here. // // This may be subject (pun intended) of debate: should this policy // only be applied while the sequence is active? With the current // behavior, a sequence will "die out" after it has terminated by // continuing to drop OnNext notifications from the queue. // // In v1.x, this behavior was due to trimming based on the clock value // returned by scheduler.Now, applied to all but the terminal message // in the queue. Using the IStopwatch has the same effect. Either way, // we guarantee the final notification will be observed, but there's // no way to retain the buffer directly. One approach is to use the // time-based TakeLast operator and apply an unbounded ReplaySubject // to it. // // To conclude, we're keeping the behavior as-is for compatibility // reasons with v1.x. // Trim(_stopwatch.Elapsed); _observers = _observers.Add(so); n = _queue.Count; foreach (var item in _queue) so.OnNext(item.Value); if (_error != null) { n++; so.OnError(_error); } else if (_isStopped) { n++; so.OnCompleted(); } } so.EnsureActive(n); return subscription; } private void Unsubscribe(IScheduledObserver observer) { lock (_gate) { observer.Dispose(); if (!_isDisposed) { _observers = _observers.Remove(observer); } } } void IReplaySubjectImplementation.Unsubscribe(IObserver observer) { var so = (IScheduledObserver)observer; Unsubscribe(so); } sealed class RemovableDisposable : IDisposable { private readonly ReplayByTime _subject; private readonly IScheduledObserver _observer; public RemovableDisposable(ReplayByTime subject, IScheduledObserver observer) { _subject = subject; _observer = observer; } public void Dispose() { _observer.Dispose(); _subject.Unsubscribe(_observer); } } private void CheckDisposed() { if (_isDisposed) throw new ObjectDisposedException(string.Empty); } public void Dispose() { lock (_gate) { _isDisposed = true; _observers = null; _queue.Clear(); } } } // // Below are the non-time based implementations. // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action. // The ReplayOne implementation also removes the need to even have a queue. // private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation { private bool _hasValue; private T _value; protected override void Trim() { // // No need to trim. // } protected override void AddValueToBuffer(T value) { _hasValue = true; _value = value; } protected override void ReplayBuffer(IObserver observer) { if (_hasValue) observer.OnNext(_value); } protected override void Dispose(bool disposing) { base.Dispose(disposing); _value = default(T); } } private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation { private readonly int _bufferSize; public ReplayMany(int bufferSize) : base(bufferSize) { _bufferSize = bufferSize; } protected override void Trim() { while (Queue.Count > _bufferSize) Queue.Dequeue(); } } private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation { public ReplayAll() : base(0) { } protected override void Trim() { // // Don't trim, keep all values. // } } private abstract class ReplayBufferBase : IReplaySubjectImplementation { private readonly object _gate = new object(); private bool _isDisposed; private bool _isStopped; private Exception _error; private ImmutableList> _observers; protected ReplayBufferBase() { _observers = ImmutableList>.Empty; } protected abstract void Trim(); protected abstract void AddValueToBuffer(T value); protected abstract void ReplayBuffer(IObserver observer); public bool HasObservers { get { var observers = _observers; return observers != null && observers.Data.Length > 0; } } public void OnNext(T value) { lock (_gate) { CheckDisposed(); if (!_isStopped) { AddValueToBuffer(value); Trim(); var o = _observers.Data; foreach (var observer in o) observer.OnNext(value); } } } public void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); lock (_gate) { CheckDisposed(); if (!_isStopped) { _isStopped = true; _error = error; Trim(); var o = _observers.Data; foreach (var observer in o) observer.OnError(error); _observers = ImmutableList>.Empty; } } } public void OnCompleted() { lock (_gate) { CheckDisposed(); if (!_isStopped) { _isStopped = true; Trim(); var o = _observers.Data; foreach (var observer in o) observer.OnCompleted(); _observers = ImmutableList>.Empty; } } } public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); var subscription = new Subscription(this, observer); lock (_gate) { CheckDisposed(); // // Notice the v1.x behavior of always calling Trim is preserved here. // // This may be subject (pun intended) of debate: should this policy // only be applied while the sequence is active? With the current // behavior, a sequence will "die out" after it has terminated by // continuing to drop OnNext notifications from the queue. // // In v1.x, this behavior was due to trimming based on the clock value // returned by scheduler.Now, applied to all but the terminal message // in the queue. Using the IStopwatch has the same effect. Either way, // we guarantee the final notification will be observed, but there's // no way to retain the buffer directly. One approach is to use the // time-based TakeLast operator and apply an unbounded ReplaySubject // to it. // // To conclude, we're keeping the behavior as-is for compatibility // reasons with v1.x. // Trim(); _observers = _observers.Add(observer); ReplayBuffer(observer); if (_error != null) { observer.OnError(_error); } else if (_isStopped) { observer.OnCompleted(); } } return subscription; } public void Unsubscribe(IObserver observer) { lock (_gate) { if (!_isDisposed) { _observers = _observers.Remove(observer); } } } private void CheckDisposed() { if (_isDisposed) throw new ObjectDisposedException(string.Empty); } public void Dispose() { Dispose(true); } protected virtual void Dispose(bool disposing) { lock (_gate) { _isDisposed = true; _observers = null; } } } private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation { private readonly Queue _queue; protected ReplayManyBase(int queueSize) : base() { _queue = new Queue(queueSize); } protected Queue Queue { get { return _queue; } } protected override void AddValueToBuffer(T value) { _queue.Enqueue(value); } protected override void ReplayBuffer(IObserver observer) { foreach (var item in _queue) observer.OnNext(item); } protected override void Dispose(bool disposing) { base.Dispose(disposing); _queue.Clear(); } } } }