| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System.Collections.Generic;using System.Reactive.Concurrency;namespace System.Reactive.Subjects{    /// <summary>    /// Represents an object that is both an observable sequence as well as an observer.    /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.    /// </summary>    /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>    public sealed class ReplaySubject<T> : ISubject<T>, IDisposable    {        private const int InfiniteBufferSize = int.MaxValue;        private readonly int _bufferSize;        private readonly TimeSpan _window;        private readonly IScheduler _scheduler;        private readonly IStopwatch _stopwatch;        private readonly Queue<TimeInterval<T>> _queue;        private bool _isStopped;        private Exception _error;        private ImmutableList<ScheduledObserver<T>> _observers;        private bool _isDisposed;                private readonly object _gate = new object();        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified buffer size, window and scheduler.        /// </summary>        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>        /// <param name="window">Maximum time length of the replay buffer.</param>        /// <param name="scheduler">Scheduler the observers are invoked on.</param>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>        public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)        {            if (bufferSize < 0)                throw new ArgumentOutOfRangeException("bufferSize");            if (window < TimeSpan.Zero)                throw new ArgumentOutOfRangeException("window");            if (scheduler == null)                throw new ArgumentNullException("scheduler");            _bufferSize = bufferSize;            _window = window;            _scheduler = scheduler;            _stopwatch = _scheduler.StartStopwatch();            _queue = new Queue<TimeInterval<T>>();            _isStopped = false;            _error = null;            _observers = new ImmutableList<ScheduledObserver<T>>();        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified buffer size and window.        /// </summary>        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>        /// <param name="window">Maximum time length of the replay buffer.</param>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>        public ReplaySubject(int bufferSize, TimeSpan window)            : this(bufferSize, window, SchedulerDefaults.Iteration)        {        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class.        /// </summary>        public ReplaySubject()            : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)        {        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified scheduler.        /// </summary>        /// <param name="scheduler">Scheduler the observers are invoked on.</param>        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>        public ReplaySubject(IScheduler scheduler)            : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)        {        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified buffer size and scheduler.        /// </summary>        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>        /// <param name="scheduler">Scheduler the observers are invoked on.</param>        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>        public ReplaySubject(int bufferSize, IScheduler scheduler)            : this(bufferSize, TimeSpan.MaxValue, scheduler)        {        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified buffer size.        /// </summary>        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>        public ReplaySubject(int bufferSize)            : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)        {        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified window and scheduler.        /// </summary>        /// <param name="window">Maximum time length of the replay buffer.</param>        /// <param name="scheduler">Scheduler the observers are invoked on.</param>        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>        public ReplaySubject(TimeSpan window, IScheduler scheduler)            : this(InfiniteBufferSize, window, scheduler)        {        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified window.        /// </summary>        /// <param name="window">Maximum time length of the replay buffer.</param>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>        public ReplaySubject(TimeSpan window)            : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)        {        }        /// <summary>        /// Indicates whether the subject has observers subscribed to it.        /// </summary>        public bool HasObservers        {            get            {                var observers = _observers;                return observers != null && observers.Data.Length > 0;            }        }        void Trim(TimeSpan now)        {            while (_queue.Count > _bufferSize)                _queue.Dequeue();            while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)                _queue.Dequeue();        }        /// <summary>        /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.        /// </summary>        /// <param name="value">The value to send to all observers.</param>        public void OnNext(T value)        {            var o = default(ScheduledObserver<T>[]);            lock (_gate)            {                CheckDisposed();                if (!_isStopped)                {                    var now = _stopwatch.Elapsed;                    _queue.Enqueue(new TimeInterval<T>(value, now));                    Trim(now);                    o = _observers.Data;                    foreach (var observer in o)                        observer.OnNext(value);                }            }            if (o != null)                foreach (var observer in o)                    observer.EnsureActive();        }        /// <summary>        /// Notifies all subscribed and future observers about the specified exception.        /// </summary>        /// <param name="error">The exception to send to all observers.</param>        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>        public void OnError(Exception error)        {            if (error == null)                throw new ArgumentNullException("error");            var o = default(ScheduledObserver<T>[]);            lock (_gate)            {                CheckDisposed();                if (!_isStopped)                {                    var now = _stopwatch.Elapsed;                    _isStopped = true;                    _error = error;                    Trim(now);                    o = _observers.Data;                    foreach (var observer in o)                        observer.OnError(error);                    _observers = new ImmutableList<ScheduledObserver<T>>();                }            }            if (o != null)                foreach (var observer in o)                    observer.EnsureActive();        }        /// <summary>        /// Notifies all subscribed and future observers about the end of the sequence.        /// </summary>        public void OnCompleted()        {            var o = default(ScheduledObserver<T>[]);            lock (_gate)            {                CheckDisposed();                if (!_isStopped)                {                    var now = _stopwatch.Elapsed;                    _isStopped = true;                    Trim(now);                    o = _observers.Data;                    foreach (var observer in o)                        observer.OnCompleted();                    _observers = new ImmutableList<ScheduledObserver<T>>();                }            }            if (o != null)                foreach (var observer in o)                    observer.EnsureActive();        }        /// <summary>        /// Subscribes an observer to the subject.        /// </summary>        /// <param name="observer">Observer to subscribe to the subject.</param>        /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>        public IDisposable Subscribe(IObserver<T> observer)        {            if (observer == null)                throw new ArgumentNullException("observer");            var so = new ScheduledObserver<T>(_scheduler, observer);            var n = 0;            var subscription = new RemovableDisposable(this, so);            lock (_gate)            {                CheckDisposed();                //                // Notice the v1.x behavior of always calling Trim is preserved here.                //                // This may be subject (pun intended) of debate: should this policy                // only be applied while the sequence is active? With the current                // behavior, a sequence will "die out" after it has terminated by                // continuing to drop OnNext notifications from the queue.                //                // In v1.x, this behavior was due to trimming based on the clock value                // returned by scheduler.Now, applied to all but the terminal message                // in the queue. Using the IStopwatch has the same effect. Either way,                // we guarantee the final notification will be observed, but there's                // no way to retain the buffer directly. One approach is to use the                // time-based TakeLast operator and apply an unbounded ReplaySubject                // to it.                //                // To conclude, we're keeping the behavior as-is for compatibility                // reasons with v1.x.                //                Trim(_stopwatch.Elapsed);                _observers = _observers.Add(so);                n = _queue.Count;                foreach (var item in _queue)                    so.OnNext(item.Value);                if (_error != null)                {                    n++;                    so.OnError(_error);                }                else if (_isStopped)                {                    n++;                    so.OnCompleted();                }            }            so.EnsureActive(n);            return subscription;        }        void Unsubscribe(ScheduledObserver<T> observer)        {            lock (_gate)            {                if (!_isDisposed)                    _observers = _observers.Remove(observer);            }        }        sealed class RemovableDisposable : IDisposable        {            private readonly ReplaySubject<T> _subject;            private readonly ScheduledObserver<T> _observer;            public RemovableDisposable(ReplaySubject<T> subject, ScheduledObserver<T> observer)            {                _subject = subject;                _observer = observer;            }            public void Dispose()            {                _observer.Dispose();                _subject.Unsubscribe(_observer);            }        }        void CheckDisposed()        {            if (_isDisposed)                throw new ObjectDisposedException(string.Empty);        }        /// <summary>        /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>"/> class and unsubscribe all observers.        /// </summary>        public void Dispose()        {            lock (_gate)            {                _isDisposed = true;                _observers = null;            }        }    }}
 |