| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System.Collections.Generic;using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Threading;namespace System.Reactive.Subjects{    /// <summary>    /// Represents an object that is both an observable sequence as well as an observer.    /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.    /// </summary>    /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>    public sealed class ReplaySubject<T> : SubjectBase<T>, IDisposable    {        #region Fields        /// <summary>        /// Underlying optimized implementation of the replay subject.        /// </summary>        private readonly SubjectBase<T> _implementation;        #endregion        #region Constructors        #region All        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class.        /// </summary>        public ReplaySubject()            : this(int.MaxValue)        {        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified scheduler.        /// </summary>        /// <param name="scheduler">Scheduler the observers are invoked on.</param>        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>        public ReplaySubject(IScheduler scheduler)        {            _implementation = new ReplayByTime(scheduler);        }        #endregion        #region Count        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified buffer size.        /// </summary>        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>        public ReplaySubject(int bufferSize)        {            switch (bufferSize)            {                case 1:                    _implementation = new ReplayOne();                    break;                case int.MaxValue:                    _implementation = new ReplayAll();                    break;                default:                    _implementation = new ReplayMany(bufferSize);                    break;            }        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified buffer size and scheduler.        /// </summary>        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>        /// <param name="scheduler">Scheduler the observers are invoked on.</param>        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>        public ReplaySubject(int bufferSize, IScheduler scheduler)        {            _implementation = new ReplayByTime(bufferSize, scheduler);        }        #endregion        #region Time        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified window.        /// </summary>        /// <param name="window">Maximum time length of the replay buffer.</param>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>        public ReplaySubject(TimeSpan window)        {            _implementation = new ReplayByTime(window);        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified window and scheduler.        /// </summary>        /// <param name="window">Maximum time length of the replay buffer.</param>        /// <param name="scheduler">Scheduler the observers are invoked on.</param>        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>        public ReplaySubject(TimeSpan window, IScheduler scheduler)        {            _implementation = new ReplayByTime(window, scheduler);        }        #endregion        #region Count & Time        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified buffer size and window.        /// </summary>        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>        /// <param name="window">Maximum time length of the replay buffer.</param>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>        public ReplaySubject(int bufferSize, TimeSpan window)        {            _implementation = new ReplayByTime(bufferSize, window);        }        /// <summary>        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>" /> class with the specified buffer size, window and scheduler.        /// </summary>        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>        /// <param name="window">Maximum time length of the replay buffer.</param>        /// <param name="scheduler">Scheduler the observers are invoked on.</param>        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>        public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)        {            _implementation = new ReplayByTime(bufferSize, window, scheduler);        }        #endregion        #endregion        #region Properties        /// <summary>        /// Indicates whether the subject has observers subscribed to it.        /// </summary>        public override bool HasObservers        {            get { return _implementation.HasObservers; }        }        /// <summary>        /// Indicates whether the subject has been disposed.        /// </summary>        public override bool IsDisposed        {            get { return _implementation.IsDisposed; }        }        #endregion        #region Methods        #region IObserver<T> implementation        /// <summary>        /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.        /// </summary>        /// <param name="value">The value to send to all observers.</param>        public override void OnNext(T value)        {            _implementation.OnNext(value);        }        /// <summary>        /// Notifies all subscribed and future observers about the specified exception.        /// </summary>        /// <param name="error">The exception to send to all observers.</param>        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>        public override void OnError(Exception error)        {            if (error == null)                throw new ArgumentNullException("error");            _implementation.OnError(error);        }        /// <summary>        /// Notifies all subscribed and future observers about the end of the sequence.        /// </summary>        public override void OnCompleted()        {            _implementation.OnCompleted();        }        #endregion        #region IObservable<T> implementation        /// <summary>        /// Subscribes an observer to the subject.        /// </summary>        /// <param name="observer">Observer to subscribe to the subject.</param>        /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>        public override IDisposable Subscribe(IObserver<T> observer)        {            if (observer == null)                throw new ArgumentNullException("observer");            return _implementation.Subscribe(observer);        }        #endregion        #region IDisposable implementation        /// <summary>        /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject<T>"/> class and unsubscribe all observers.        /// </summary>        public override void Dispose()        {            _implementation.Dispose();        }        #endregion        #endregion        private abstract class ReplayBase : SubjectBase<T>        {            private readonly object _gate = new object();            private ImmutableList<IScheduledObserver<T>> _observers;            private bool _isStopped;            private Exception _error;            private bool _isDisposed;            public ReplayBase()            {                _observers = ImmutableList<IScheduledObserver<T>>.Empty;                _isStopped = false;                _error = null;            }            public override bool HasObservers            {                get                {                    var observers = _observers;                    return observers != null && observers.Data.Length > 0;                }            }            public override bool IsDisposed            {                get                {                    lock (_gate)                    {                        return _isDisposed;                    }                }            }            public override void OnNext(T value)            {                var o = default(IScheduledObserver<T>[]);                lock (_gate)                {                    CheckDisposed();                    if (!_isStopped)                    {                        Next(value);                        Trim();                        o = _observers.Data;                        foreach (var observer in o)                            observer.OnNext(value);                    }                }                if (o != null)                {                    foreach (var observer in o)                        observer.EnsureActive();                }            }            public override void OnError(Exception error)            {                var o = default(IScheduledObserver<T>[]);                lock (_gate)                {                    CheckDisposed();                    if (!_isStopped)                    {                        _isStopped = true;                        _error = error;                        Trim();                        o = _observers.Data;                        foreach (var observer in o)                            observer.OnError(error);                        _observers = ImmutableList<IScheduledObserver<T>>.Empty;                    }                }                if (o != null)                {                    foreach (var observer in o)                        observer.EnsureActive();                }            }            public override void OnCompleted()            {                var o = default(IScheduledObserver<T>[]);                lock (_gate)                {                    CheckDisposed();                    if (!_isStopped)                    {                        _isStopped = true;                        Trim();                        o = _observers.Data;                        foreach (var observer in o)                            observer.OnCompleted();                        _observers = ImmutableList<IScheduledObserver<T>>.Empty;                    }                }                if (o != null)                {                    foreach (var observer in o)                        observer.EnsureActive();                }            }            public override IDisposable Subscribe(IObserver<T> observer)            {                var so = CreateScheduledObserver(observer);                var n = 0;                var subscription = Disposable.Empty;                lock (_gate)                {                    CheckDisposed();                    //                    // Notice the v1.x behavior of always calling Trim is preserved here.                    //                    // This may be subject (pun intended) of debate: should this policy                    // only be applied while the sequence is active? With the current                    // behavior, a sequence will "die out" after it has terminated by                    // continuing to drop OnNext notifications from the queue.                    //                    // In v1.x, this behavior was due to trimming based on the clock value                    // returned by scheduler.Now, applied to all but the terminal message                    // in the queue. Using the IStopwatch has the same effect. Either way,                    // we guarantee the final notification will be observed, but there's                    // no way to retain the buffer directly. One approach is to use the                    // time-based TakeLast operator and apply an unbounded ReplaySubject                    // to it.                    //                    // To conclude, we're keeping the behavior as-is for compatibility                    // reasons with v1.x.                    //                    Trim();                    n = Replay(so);                    if (_error != null)                    {                        n++;                        so.OnError(_error);                    }                    else if (_isStopped)                    {                        n++;                        so.OnCompleted();                    }                    if (!_isStopped)                    {                        subscription = new Subscription(this, so);                        _observers = _observers.Add(so);                    }                }                so.EnsureActive(n);                return subscription;            }            public override void Dispose()            {                lock (_gate)                {                    _isDisposed = true;                    _observers = null;                    DisposeCore();                }            }            protected abstract void DisposeCore();            protected abstract void Next(T value);            protected abstract int Replay(IObserver<T> observer);            protected abstract void Trim();            protected abstract IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer);            private void CheckDisposed()            {                if (_isDisposed)                    throw new ObjectDisposedException(string.Empty);            }            private void Unsubscribe(IScheduledObserver<T> observer)            {                lock (_gate)                {                    if (!_isDisposed)                    {                        _observers = _observers.Remove(observer);                    }                }            }#if NOTYET // TODO: Expose internal notifications similar to BehaviorSubject<T>.TryGetValue?            public bool TryGetNotifications(out IList<Notification<T>> notifications)            {                lock (_gate)                {                    if (_isDisposed)                    {                        notifications = null;                        return false;                    }                    else                    {                        var res = new List<Notification<T>>();                        var materializer = Observer.Create<T>(                            x => res.Add(Notification.CreateOnNext(x)),                            ex => res.Add(Notification.CreateOnError<T>(ex)),                            () => res.Add(Notification.CreateOnCompleted<T>())                        );                        Replay(materializer);                        if (_error != null)                        {                            materializer.OnError(_error);                        }                        else if (_isStopped)                        {                            materializer.OnCompleted();                        }                        notifications = res;                        return true;                    }                }            }#endif            private sealed class Subscription : IDisposable            {                private readonly ReplayBase _subject;                private readonly IScheduledObserver<T> _observer;                public Subscription(ReplayBase subject, IScheduledObserver<T> observer)                {                    _subject = subject;                    _observer = observer;                }                public void Dispose()                {                    _observer.Dispose();                    _subject.Unsubscribe(_observer);                }            }        }        /// <summary>        /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).        /// </summary>        private sealed class ReplayByTime : ReplayBase        {            private const int InfiniteBufferSize = int.MaxValue;            private readonly int _bufferSize;            private readonly TimeSpan _window;            private readonly IScheduler _scheduler;            private readonly IStopwatch _stopwatch;            private readonly Queue<TimeInterval<T>> _queue;            public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)            {                if (bufferSize < 0)                    throw new ArgumentOutOfRangeException("bufferSize");                if (window < TimeSpan.Zero)                    throw new ArgumentOutOfRangeException("window");                if (scheduler == null)                    throw new ArgumentNullException("scheduler");                _bufferSize = bufferSize;                _window = window;                _scheduler = scheduler;                _stopwatch = _scheduler.StartStopwatch();                _queue = new Queue<TimeInterval<T>>();            }            public ReplayByTime(int bufferSize, TimeSpan window)                : this(bufferSize, window, SchedulerDefaults.Iteration)            {            }            public ReplayByTime(IScheduler scheduler)                : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)            {            }            public ReplayByTime(int bufferSize, IScheduler scheduler)                : this(bufferSize, TimeSpan.MaxValue, scheduler)            {            }            public ReplayByTime(TimeSpan window, IScheduler scheduler)                : this(InfiniteBufferSize, window, scheduler)            {            }            public ReplayByTime(TimeSpan window)                : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)            {            }            protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)            {                return new ScheduledObserver<T>(_scheduler, observer);            }            protected override void DisposeCore()            {                _queue.Clear();            }            protected override void Next(T value)            {                var now = _stopwatch.Elapsed;                _queue.Enqueue(new TimeInterval<T>(value, now));            }            protected override int Replay(IObserver<T> observer)            {                var n = _queue.Count;                foreach (var item in _queue)                    observer.OnNext(item.Value);                return n;            }            protected override void Trim()            {                var now = _stopwatch.Elapsed;                while (_queue.Count > _bufferSize)                    _queue.Dequeue();                while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)                    _queue.Dequeue();            }        }        //        // Below are the non-time based implementations.         // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.        // The ReplayOne implementation also removes the need to even have a queue.        //        private sealed class ReplayOne : ReplayBufferBase        {            private bool _hasValue;            private T _value;            protected override void Trim()            {                //                // No need to trim.                //            }            protected override void Next(T value)            {                _hasValue = true;                _value = value;            }            protected override int Replay(IObserver<T> observer)            {                var n = 0;                if (_hasValue)                {                    n = 1;                    observer.OnNext(_value);                }                return n;            }            protected override void DisposeCore()            {                _value = default(T);            }        }        private sealed class ReplayMany : ReplayManyBase        {            private readonly int _bufferSize;            public ReplayMany(int bufferSize)                : base(bufferSize)            {                _bufferSize = bufferSize;            }            protected override void Trim()            {                while (_queue.Count > _bufferSize)                    _queue.Dequeue();            }        }        private sealed class ReplayAll : ReplayManyBase        {            public ReplayAll()                : base(0)            {            }            protected override void Trim()            {                //                // Don't trim, keep all values.                //            }        }        private abstract class ReplayBufferBase : ReplayBase        {            protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)            {                return new FastImmediateObserver<T>(observer);            }            protected override void DisposeCore()            {            }        }        private abstract class ReplayManyBase : ReplayBufferBase        {            protected readonly Queue<T> _queue;            protected ReplayManyBase(int queueSize)                : base()            {                _queue = new Queue<T>(Math.Min(queueSize, 64));            }            protected override void Next(T value)            {                _queue.Enqueue(value);            }            protected override int Replay(IObserver<T> observer)            {                var n = _queue.Count;                foreach (var item in _queue)                    observer.OnNext(item);                return n;            }            protected override void DisposeCore()            {                _queue.Clear();            }        }    }    /// <summary>    /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.    /// </summary>    /// <typeparam name="T">Type of the elements processed by the observer.</typeparam>    class FastImmediateObserver<T> : IScheduledObserver<T>    {        /// <summary>        /// Gate to control ownership transfer and protect data structures.        /// </summary>        private readonly object _gate = new object();        /// <summary>        /// Observer to forward notifications to.        /// </summary>        private volatile IObserver<T> _observer;        /// <summary>        /// Queue to enqueue OnNext notifications into.        /// </summary>        private Queue<T> _queue = new Queue<T>();        /// <summary>        /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse        /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.        /// </summary>        private Queue<T> _queue2;        /// <summary>        /// Exception passed to an OnError notification, if any.        /// </summary>        private Exception _error;        /// <summary>        /// Indicates whether an OnCompleted notification was received.        /// </summary>        private bool _done;        /// <summary>        /// Indicates whether the observer is busy, i.e. some thread is actively draining the        /// notifications that were queued up.        /// </summary>        private bool _busy;        /// <summary>        /// Indicates whether a failure occurred when the owner was draining the queue. This will        /// prevent future work to be processed.        /// </summary>        private bool _hasFaulted;        /// <summary>        /// Creates a new scheduled observer that proxies to the specified observer.        /// </summary>        /// <param name="observer">Observer to forward notifications to.</param>        public FastImmediateObserver(IObserver<T> observer)        {            _observer = observer;        }        /// <summary>        /// Disposes the observer.        /// </summary>        public void Dispose()        {            Done();        }        /// <summary>        /// Notifies the observer of pending work. This will either cause the current owner to        /// process the newly enqueued notifications, or it will cause the calling thread to        /// become the owner and start processing the notification queue.        /// </summary>        public void EnsureActive()        {            EnsureActive(1);        }        /// <summary>        /// Notifies the observer of pending work. This will either cause the current owner to        /// process the newly enqueued notifications, or it will cause the calling thread to        /// become the owner and start processing the notification queue.        /// </summary>        /// <param name="count">The number of enqueued notifications to process (ignored).</param>        public void EnsureActive(int count)        {            var isOwner = false;            lock (_gate)            {                //                // If we failed to process work in the past, we'll simply drop it.                //                if (!_hasFaulted)                {                    //                    // If no-one is processing the notification queue, become the owner.                    //                    if (!_busy)                    {                        isOwner = true;                        _busy = true;                    }                }            }            if (isOwner)            {                while (true)                {                    var queue = default(Queue<T>);                    var error = default(Exception);                    var done = false;                    //                    // Steal notifications from the producer side to drain them to the observer.                    //                    lock (_gate)                    {                        //                        // Do we have any OnNext notifications to process?                        //                        if (_queue.Count > 0)                        {                            if (_queue2 == null)                            {                                _queue2 = new Queue<T>();                            }                            //                            // Swap out the current queue for a fresh or recycled one. The standby                            // queue is set to null; when notifications are sent out the processed                            // queue will become the new standby.                            //                            queue = _queue;                            _queue = _queue2;                            _queue2 = null;                        }                        //                        // Do we have any terminal notifications to process?                        //                        if (_error != null)                        {                            error = _error;                        }                        else if (_done)                        {                            done = true;                        }                        else if (queue == null)                        {                            //                            // No work left; quit the loop and let another thread become the                            // owner in the future.                            //                            _busy = false;                            break;                        }                    }                    try                    {                        //                        // Process OnNext notifications, if any.                        //                        if (queue != null)                        {                            //                            // Drain the stolen OnNext notification queue.                            //                            while (queue.Count > 0)                            {                                _observer.OnNext(queue.Dequeue());                            }                            //                            // The queue is now empty, so we can reuse it by making it the standby                            // queue for a future swap.                            //                            lock (_gate)                            {                                _queue2 = queue;                            }                        }                        //                        // Process terminal notifications, if any. Notice we don't release ownership                        // after processing these notifications; we simply quit from the loop. This                        // will cause all processing of the scheduler observer to cease.                        //                        if (error != null)                        {                            var observer = Done();                            observer.OnError(error);                            break;                        }                        else if (done)                        {                            var observer = Done();                            observer.OnCompleted();                            break;                        }                    }                    catch                    {                        lock (_gate)                        {                            _hasFaulted = true;                            _queue.Clear();                        }                        throw;                    }                }            }        }        /// <summary>        /// Enqueues an OnCompleted notification.        /// </summary>        public void OnCompleted()        {            lock (_gate)            {                if (!_hasFaulted)                {                    _done = true;                }            }        }        /// <summary>        /// Enqueues an OnError notification.        /// </summary>        /// <param name="error">Error of the notification.</param>        public void OnError(Exception error)        {            lock (_gate)            {                if (!_hasFaulted)                {                    _error = error;                }            }        }        /// <summary>        /// Enqueues an OnNext notification.        /// </summary>        /// <param name="value">Value of the notification.</param>        public void OnNext(T value)        {            lock (_gate)            {                if (!_hasFaulted)                {                    _queue.Enqueue(value);                }            }        }        /// <summary>        /// Terminates the observer upon receiving terminal notifications, thus preventing        /// future notifications to go out.        /// </summary>        /// <returns>Observer to send terminal notifications to.</returns>        private IObserver<T> Done()        {            return Interlocked.Exchange(ref _observer, NopObserver<T>.Instance);        }    }}
 |