Browse Source

Adding ReplaySubject perf improvements

Lee Campbell 12 years ago
parent
commit
5449869cd7

+ 574 - 187
Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs

@@ -2,6 +2,7 @@
 
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
+using System.Threading;
 
 namespace System.Reactive.Subjects
 {
@@ -12,21 +13,7 @@ namespace System.Reactive.Subjects
     /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
     public sealed class ReplaySubject<T> : ISubject<T>, IDisposable
     {
-        private const int InfiniteBufferSize = int.MaxValue;
-
-        private readonly int _bufferSize;
-        private readonly TimeSpan _window;
-        private readonly IScheduler _scheduler;
-        private readonly IStopwatch _stopwatch;
-
-        private readonly Queue<TimeInterval<T>> _queue;
-        private bool _isStopped;
-        private Exception _error;
-
-        private ImmutableList<ScheduledObserver<T>> _observers;
-        private bool _isDisposed;
-        
-        private readonly object _gate = new object();
+        private readonly IReplaySubjectImplementation _implementation;
 
         /// <summary>
         /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size, window and scheduler.
@@ -38,23 +25,7 @@ namespace System.Reactive.Subjects
         /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
         public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
         {
-            if (bufferSize < 0)
-                throw new ArgumentOutOfRangeException("bufferSize");
-            if (window < TimeSpan.Zero)
-                throw new ArgumentOutOfRangeException("window");
-            if (scheduler == null)
-                throw new ArgumentNullException("scheduler");
-
-            _bufferSize = bufferSize;
-            _window = window;
-            _scheduler = scheduler;
-
-            _stopwatch = _scheduler.StartStopwatch();
-            _queue = new Queue<TimeInterval<T>>();
-            _isStopped = false;
-            _error = null;
-
-            _observers = new ImmutableList<ScheduledObserver<T>>();
+            _implementation = new ReplayByTime(bufferSize, window, scheduler);
         }
 
         /// <summary>
@@ -64,16 +35,16 @@ namespace System.Reactive.Subjects
         /// <param name="window">Maximum time length of the replay buffer.</param>
         /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
         public ReplaySubject(int bufferSize, TimeSpan window)
-            : this(bufferSize, window, SchedulerDefaults.Iteration)
         {
+            _implementation = new ReplayByTime(bufferSize, window);
         }
 
         /// <summary>
         /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
         /// </summary>
         public ReplaySubject()
-            : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
         {
+            _implementation = new ReplayAll();
         }
 
         /// <summary>
@@ -82,10 +53,12 @@ namespace System.Reactive.Subjects
         /// <param name="scheduler">Scheduler the observers are invoked on.</param>
         /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
         public ReplaySubject(IScheduler scheduler)
-            : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
         {
+            _implementation = new ReplayByTime(scheduler);
         }
 
+        //TODO: Does this overload make any sense with the optimisations? Surely this now is just <c>new ReplaySubject<T>(bufferSize).SubscribeOn(scheduler)</c>?
+        //Potentially should be marked as obsolete
         /// <summary>
         /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and scheduler.
         /// </summary>
@@ -94,8 +67,8 @@ namespace System.Reactive.Subjects
         /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
         /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
         public ReplaySubject(int bufferSize, IScheduler scheduler)
-            : this(bufferSize, TimeSpan.MaxValue, scheduler)
         {
+            _implementation = new ReplayByTime(bufferSize, scheduler);
         }
 
         /// <summary>
@@ -104,8 +77,19 @@ namespace System.Reactive.Subjects
         /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
         /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
         public ReplaySubject(int bufferSize)
-            : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
         {
+            switch (bufferSize)
+            {
+                case 1:
+                    _implementation = new ReplayOne();
+                    break;
+                case int.MaxValue:
+                    _implementation = new ReplayAll();
+                    break;
+                default:
+                    _implementation = new ReplayMany(bufferSize);
+                    break;
+            }
         }
 
         /// <summary>
@@ -116,8 +100,8 @@ namespace System.Reactive.Subjects
         /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
         /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
         public ReplaySubject(TimeSpan window, IScheduler scheduler)
-            : this(InfiniteBufferSize, window, scheduler)
         {
+            _implementation = new ReplayByTime(window, scheduler);
         }
 
         /// <summary>
@@ -126,8 +110,8 @@ namespace System.Reactive.Subjects
         /// <param name="window">Maximum time length of the replay buffer.</param>
         /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
         public ReplaySubject(TimeSpan window)
-            : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
         {
+            _implementation = new ReplayByTime(window);
         }
 
         /// <summary>
@@ -135,19 +119,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         public bool HasObservers
         {
-            get
-            {
-                var observers = _observers;
-                return observers != null && observers.Data.Length > 0;
-            }
-        }
-
-        void Trim(TimeSpan now)
-        {
-            while (_queue.Count > _bufferSize)
-                _queue.Dequeue();
-            while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
-                _queue.Dequeue();
+            get { return _implementation.HasObservers; }
         }
 
         /// <summary>
@@ -156,26 +128,7 @@ namespace System.Reactive.Subjects
         /// <param name="value">The value to send to all observers.</param>
         public void OnNext(T value)
         {
-            var o = default(ScheduledObserver<T>[]);
-            lock (_gate)
-            {
-                CheckDisposed();
-
-                if (!_isStopped)
-                {
-                    var now = _stopwatch.Elapsed;
-                    _queue.Enqueue(new TimeInterval<T>(value, now));
-                    Trim(now);
-
-                    o = _observers.Data;
-                    foreach (var observer in o)
-                        observer.OnNext(value);
-                }
-            }
-
-            if (o != null)
-                foreach (var observer in o)
-                    observer.EnsureActive();
+            _implementation.OnNext(value);
         }
 
         /// <summary>
@@ -185,169 +138,603 @@ namespace System.Reactive.Subjects
         /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
         public void OnError(Exception error)
         {
-            if (error == null)
-                throw new ArgumentNullException("error");
+            _implementation.OnError(error);
+        }
 
-            var o = default(ScheduledObserver<T>[]);
-            lock (_gate)
-            {
-                CheckDisposed();
+        /// <summary>
+        /// Notifies all subscribed and future observers about the end of the sequence.
+        /// </summary>
+        public void OnCompleted()
+        {
+            _implementation.OnCompleted();
+        }
 
-                if (!_isStopped)
-                {
-                    var now = _stopwatch.Elapsed;
-                    _isStopped = true;
-                    _error = error;
-                    Trim(now);
+        /// <summary>
+        /// Subscribes an observer to the subject.
+        /// </summary>
+        /// <param name="observer">Observer to subscribe to the subject.</param>
+        /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
+        public IDisposable Subscribe(IObserver<T> observer)
+        {
+            return _implementation.Subscribe(observer);
+        }
 
-                    o = _observers.Data;
-                    foreach (var observer in o)
-                        observer.OnError(error);
+        /// <summary>
+        /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
+        /// </summary>
+        public void Dispose()
+        {
+            _implementation.Dispose();
+        }
 
-                    _observers = new ImmutableList<ScheduledObserver<T>>();
-                }
+        private interface IReplaySubjectImplementation : ISubject<T>, IDisposable
+        {
+            bool HasObservers { get; }
+            void Unsubscribe(IObserver<T> observer);
+        }
+
+        private class Subscription : IDisposable
+        {
+            private IReplaySubjectImplementation _subject;
+            private IObserver<T> _observer;
+
+            public Subscription(IReplaySubjectImplementation subject, IObserver<T> observer)
+            {
+                _subject = subject;
+                _observer = observer;
             }
 
-            if (o != null)
-                foreach (var observer in o)
-                    observer.EnsureActive();
+            public void Dispose()
+            {
+                var observer = Interlocked.Exchange(ref _observer, null);
+                if (observer == null)
+                    return;
+
+                _subject.Unsubscribe(observer);
+                _subject = null;
+            }
         }
 
-        /// <summary>
-        /// Notifies all subscribed and future observers about the end of the sequence.
-        /// </summary>
-        public void OnCompleted()
+        //Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
+        private sealed class ReplayByTime : IReplaySubjectImplementation
         {
-            var o = default(ScheduledObserver<T>[]);
-            lock (_gate)
+            private const int InfiniteBufferSize = int.MaxValue;
+
+            private readonly int _bufferSize;
+            private readonly TimeSpan _window;
+            private readonly IScheduler _scheduler;
+            private readonly IStopwatch _stopwatch;
+
+            private readonly Queue<TimeInterval<T>> _queue;
+            private bool _isStopped;
+            private Exception _error;
+
+            private ImmutableList<ScheduledObserver<T>> _observers;
+            private bool _isDisposed;
+
+            private readonly object _gate = new object();
+
+            public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
+            {
+                if (bufferSize < 0)
+                    throw new ArgumentOutOfRangeException("bufferSize");
+                if (window < TimeSpan.Zero)
+                    throw new ArgumentOutOfRangeException("window");
+                if (scheduler == null)
+                    throw new ArgumentNullException("scheduler");
+
+                _bufferSize = bufferSize;
+                _window = window;
+                _scheduler = scheduler;
+
+                _stopwatch = _scheduler.StartStopwatch();
+                _queue = new Queue<TimeInterval<T>>();
+                _isStopped = false;
+                _error = null;
+
+                _observers = new ImmutableList<ScheduledObserver<T>>();
+            }
+
+            public ReplayByTime(int bufferSize, TimeSpan window)
+                : this(bufferSize, window, SchedulerDefaults.Iteration)
+            {
+            }
+
+            public ReplayByTime(IScheduler scheduler)
+                : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
+            {
+            }
+
+            public ReplayByTime(int bufferSize, IScheduler scheduler)
+                : this(bufferSize, TimeSpan.MaxValue, scheduler)
+            {
+            }
+
+            public ReplayByTime(TimeSpan window, IScheduler scheduler)
+                : this(InfiniteBufferSize, window, scheduler)
+            {
+            }
+
+            public ReplayByTime(TimeSpan window)
+                : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
             {
-                CheckDisposed();
+            }
+
+            public bool HasObservers
+            {
+                get
+                {
+                    var observers = _observers;
+                    return observers != null && observers.Data.Length > 0;
+                }
+            }
+
+            private void Trim(TimeSpan now)
+            {
+                while (_queue.Count > _bufferSize)
+                    _queue.Dequeue();
+                while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
+                    _queue.Dequeue();
+            }
 
-                if (!_isStopped)
+            public void OnNext(T value)
+            {
+                var o = default(ScheduledObserver<T>[]);
+                lock (_gate)
                 {
-                    var now = _stopwatch.Elapsed;
-                    _isStopped = true;
-                    Trim(now);
+                    CheckDisposed();
+
+                    if (!_isStopped)
+                    {
+                        var now = _stopwatch.Elapsed;
+                        _queue.Enqueue(new TimeInterval<T>(value, now));
+                        Trim(now);
+
+                        o = _observers.Data;
+                        foreach (var observer in o)
+                            observer.OnNext(value);
+                    }
+                }
 
-                    o = _observers.Data;
+                if (o != null)
                     foreach (var observer in o)
-                        observer.OnCompleted();
+                        observer.EnsureActive();
+            }
+
+            public void OnError(Exception error)
+            {
+                if (error == null)
+                    throw new ArgumentNullException("error");
 
-                    _observers = new ImmutableList<ScheduledObserver<T>>();
+                var o = default(ScheduledObserver<T>[]);
+                lock (_gate)
+                {
+                    CheckDisposed();
+
+                    if (!_isStopped)
+                    {
+                        var now = _stopwatch.Elapsed;
+                        _isStopped = true;
+                        _error = error;
+                        Trim(now);
+
+                        o = _observers.Data;
+                        foreach (var observer in o)
+                            observer.OnError(error);
+
+                        _observers = new ImmutableList<ScheduledObserver<T>>();
+                    }
                 }
+
+                if (o != null)
+                    foreach (var observer in o)
+                        observer.EnsureActive();
             }
 
-            if (o != null)
-                foreach (var observer in o)
-                    observer.EnsureActive();
-        }
+            public void OnCompleted()
+            {
+                var o = default(ScheduledObserver<T>[]);
+                lock (_gate)
+                {
+                    CheckDisposed();
 
-        /// <summary>
-        /// Subscribes an observer to the subject.
-        /// </summary>
-        /// <param name="observer">Observer to subscribe to the subject.</param>
-        /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
-        public IDisposable Subscribe(IObserver<T> observer)
-        {
-            if (observer == null)
-                throw new ArgumentNullException("observer");
-
-            var so = new ScheduledObserver<T>(_scheduler, observer);
-
-            var n = 0;
-
-            var subscription = new RemovableDisposable(this, so);
-            lock (_gate)
-            {
-                CheckDisposed();
-
-                //
-                // Notice the v1.x behavior of always calling Trim is preserved here.
-                //
-                // This may be subject (pun intended) of debate: should this policy
-                // only be applied while the sequence is active? With the current
-                // behavior, a sequence will "die out" after it has terminated by
-                // continuing to drop OnNext notifications from the queue.
-                //
-                // In v1.x, this behavior was due to trimming based on the clock value
-                // returned by scheduler.Now, applied to all but the terminal message
-                // in the queue. Using the IStopwatch has the same effect. Either way,
-                // we guarantee the final notification will be observed, but there's
-                // no way to retain the buffer directly. One approach is to use the
-                // time-based TakeLast operator and apply an unbounded ReplaySubject
-                // to it.
-                //
-                // To conclude, we're keeping the behavior as-is for compatibility
-                // reasons with v1.x.
-                //
-                Trim(_stopwatch.Elapsed);
-                _observers = _observers.Add(so);
-
-                n = _queue.Count;
-                foreach (var item in _queue)
-                    so.OnNext(item.Value);
+                    if (!_isStopped)
+                    {
+                        var now = _stopwatch.Elapsed;
+                        _isStopped = true;
+                        Trim(now);
+
+                        o = _observers.Data;
+                        foreach (var observer in o)
+                            observer.OnCompleted();
+
+                        _observers = new ImmutableList<ScheduledObserver<T>>();
+                    }
+                }
 
-                if (_error != null)
+                if (o != null)
+                    foreach (var observer in o)
+                        observer.EnsureActive();
+            }
+
+            public IDisposable Subscribe(IObserver<T> observer)
+            {
+                if (observer == null)
+                    throw new ArgumentNullException("observer");
+
+                var so = new ScheduledObserver<T>(_scheduler, observer);
+
+                var n = 0;
+
+                //var subscription = new Subscription(this, so);
+                var subscription = new RemovableDisposable(this, so);
+                lock (_gate)
                 {
-                    n++;
-                    so.OnError(_error);
+                    CheckDisposed();
+
+                    //
+                    // Notice the v1.x behavior of always calling Trim is preserved here.
+                    //
+                    // This may be subject (pun intended) of debate: should this policy
+                    // only be applied while the sequence is active? With the current
+                    // behavior, a sequence will "die out" after it has terminated by
+                    // continuing to drop OnNext notifications from the queue.
+                    //
+                    // In v1.x, this behavior was due to trimming based on the clock value
+                    // returned by scheduler.Now, applied to all but the terminal message
+                    // in the queue. Using the IStopwatch has the same effect. Either way,
+                    // we guarantee the final notification will be observed, but there's
+                    // no way to retain the buffer directly. One approach is to use the
+                    // time-based TakeLast operator and apply an unbounded ReplaySubject
+                    // to it.
+                    //
+                    // To conclude, we're keeping the behavior as-is for compatibility
+                    // reasons with v1.x.
+                    //
+                    Trim(_stopwatch.Elapsed);
+                    _observers = _observers.Add(so);
+
+                    n = _queue.Count;
+                    foreach (var item in _queue)
+                        so.OnNext(item.Value);
+
+                    if (_error != null)
+                    {
+                        n++;
+                        so.OnError(_error);
+                    }
+                    else if (_isStopped)
+                    {
+                        n++;
+                        so.OnCompleted();
+                    }
                 }
-                else if (_isStopped)
+
+                so.EnsureActive(n);
+
+                return subscription;
+            }
+
+            //public void Unsubscribe(IObserver<T> observer)
+            public void Unsubscribe(ScheduledObserver<T> observer)
+            {
+                lock (_gate)
+                {
+                    //var so = (ScheduledObserver<T>)observer;
+                    //so.Dispose();
+                    if (!_isDisposed)
+                        _observers = _observers.Remove(observer);
+                }
+            }
+            //public void Unsubscribe(IObserver<T> observer)
+            void IReplaySubjectImplementation.Unsubscribe(IObserver<T> observer)
+            {
+                var so = (ScheduledObserver<T>)observer;
+                Unsubscribe(so);
+            }
+
+            sealed class RemovableDisposable : IDisposable
+            {
+                private readonly ReplayByTime _subject;
+                private readonly ScheduledObserver<T> _observer;
+
+                public RemovableDisposable(ReplayByTime subject, ScheduledObserver<T> observer)
+                {
+                    _subject = subject;
+                    _observer = observer;
+                }
+
+                public void Dispose()
                 {
-                    n++;
-                    so.OnCompleted();
+                    _observer.Dispose();
+                    _subject.Unsubscribe(_observer);
                 }
             }
 
-            so.EnsureActive(n);
+            private void CheckDisposed()
+            {
+                if (_isDisposed)
+                    throw new ObjectDisposedException(string.Empty);
+            }
 
-            return subscription;
+            public void Dispose()
+            {
+                lock (_gate)
+                {
+                    _isDisposed = true;
+                    _observers = null;
+                    //_queue.Clear();
+                }
+            }
         }
 
-        void Unsubscribe(ScheduledObserver<T> observer)
+
+
+
+
+
+
+
+
+
+
+
+
+        //Below are the non-time based implementations. 
+        //These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
+        //The ReplayOne implementation also removes the need to even have a queue.
+        private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
         {
-            lock (_gate)
+            private bool _hasValue;
+            private T _value;
+
+            protected override void Trim()
             {
-                if (!_isDisposed)
-                    _observers = _observers.Remove(observer);
+                //NoOp. No need to trim.
+            }
+
+            protected override void AddValueToBuffer(T value)
+            {
+                _hasValue = true;
+                _value = value;
+            }
+
+            protected override void ReplayBuffer(IObserver<T> observer)
+            {
+                if (_hasValue)
+                    observer.OnNext(_value);
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                base.Dispose(disposing);
+                _value = default(T);
             }
         }
 
-        sealed class RemovableDisposable : IDisposable
+        private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
         {
-            private readonly ReplaySubject<T> _subject;
-            private readonly ScheduledObserver<T> _observer;
+            private readonly int _bufferSize;
 
-            public RemovableDisposable(ReplaySubject<T> subject, ScheduledObserver<T> observer)
+            public ReplayMany(int bufferSize)
+                : base(bufferSize)
             {
-                _subject = subject;
-                _observer = observer;
+                _bufferSize = bufferSize;
             }
 
-            public void Dispose()
+            protected override void Trim()
             {
-                _observer.Dispose();
-                _subject.Unsubscribe(_observer);
+                while (Queue.Count > _bufferSize)
+                    Queue.Dequeue();
             }
         }
 
-        void CheckDisposed()
+        private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
         {
-            if (_isDisposed)
-                throw new ObjectDisposedException(string.Empty);
+            public ReplayAll()
+                : base(0)
+            {
+            }
+
+            protected override void Trim()
+            {
+                //NoOp; i.e. Dont' trim, keep all values.
+            }
         }
 
-        /// <summary>
-        /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
-        /// </summary>
-        public void Dispose()
+        private abstract class ReplayBufferBase : IReplaySubjectImplementation
+        {
+            private readonly object _gate = new object();
+            private bool _isDisposed;
+            private bool _isStopped;
+            private Exception _error;
+            private ImmutableList<IObserver<T>> _observers;
+
+            protected ReplayBufferBase()
+            {
+                _observers = new ImmutableList<IObserver<T>>();
+            }
+
+            protected abstract void Trim();
+            protected abstract void AddValueToBuffer(T value);
+            protected abstract void ReplayBuffer(IObserver<T> observer);
+
+            public bool HasObservers
+            {
+                get
+                {
+                    var observers = _observers;
+                    return observers != null && observers.Data.Length > 0;
+                }
+            }
+
+            public void OnNext(T value)
+            {
+                lock (_gate)
+                {
+                    CheckDisposed();
+
+                    if (!_isStopped)
+                    {
+                        AddValueToBuffer(value);
+                        Trim();
+
+                        var o = _observers.Data;
+                        foreach (var observer in o)
+                            observer.OnNext(value);
+                    }
+                }
+            }
+
+            public void OnError(Exception error)
+            {
+                if (error == null)
+                    throw new ArgumentNullException("error");
+
+                lock (_gate)
+                {
+                    CheckDisposed();
+
+                    if (!_isStopped)
+                    {
+                        _isStopped = true;
+                        _error = error;
+                        Trim();
+
+                        var o = _observers.Data;
+                        foreach (var observer in o)
+                            observer.OnError(error);
+
+                        _observers = new ImmutableList<IObserver<T>>();
+                    }
+                }
+            }
+
+            public void OnCompleted()
+            {
+                lock (_gate)
+                {
+                    CheckDisposed();
+
+                    if (!_isStopped)
+                    {
+                        _isStopped = true;
+                        Trim();
+
+                        var o = _observers.Data;
+                        foreach (var observer in o)
+                            observer.OnCompleted();
+
+                        _observers = new ImmutableList<IObserver<T>>();
+                    }
+                }
+            }
+
+            public IDisposable Subscribe(IObserver<T> observer)
+            {
+                if (observer == null)
+                    throw new ArgumentNullException("observer");
+
+
+                var subscription = new Subscription(this, observer);
+                lock (_gate)
+                {
+                    CheckDisposed();
+
+                    //
+                    // Notice the v1.x behavior of always calling Trim is preserved here.
+                    //
+                    // This may be subject (pun intended) of debate: should this policy
+                    // only be applied while the sequence is active? With the current
+                    // behavior, a sequence will "die out" after it has terminated by
+                    // continuing to drop OnNext notifications from the queue.
+                    //
+                    // In v1.x, this behavior was due to trimming based on the clock value
+                    // returned by scheduler.Now, applied to all but the terminal message
+                    // in the queue. Using the IStopwatch has the same effect. Either way,
+                    // we guarantee the final notification will be observed, but there's
+                    // no way to retain the buffer directly. One approach is to use the
+                    // time-based TakeLast operator and apply an unbounded ReplaySubject
+                    // to it.
+                    //
+                    // To conclude, we're keeping the behavior as-is for compatibility
+                    // reasons with v1.x.
+                    //
+                    _observers = _observers.Add(observer);
+
+                    ReplayBuffer(observer);
+
+                    if (_error != null)
+                    {
+                        observer.OnError(_error);
+                    }
+                    else if (_isStopped)
+                    {
+                        observer.OnCompleted();
+                    }
+                }
+
+                return subscription;
+            }
+
+            public void Unsubscribe(IObserver<T> observer)
+            {
+                lock (_gate)
+                {
+                    if (!_isDisposed)
+                        _observers = _observers.Remove(observer);
+                }
+            }
+
+            private void CheckDisposed()
+            {
+                if (_isDisposed)
+                    throw new ObjectDisposedException(string.Empty);
+            }
+
+            public void Dispose()
+            {
+                Dispose(true);
+            }
+            protected virtual void Dispose(bool disposing)
+            {
+                lock (_gate)
+                {
+                    _isDisposed = true;
+                    _observers = null;
+                }
+            }
+        }
+
+        private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
         {
-            lock (_gate)
+            private readonly Queue<T> _queue;
+
+            protected ReplayManyBase(int queueSize)
+                : base()
+            {
+                _queue = new Queue<T>(queueSize);
+            }
+
+            protected Queue<T> Queue { get { return _queue; } }
+
+            protected override void AddValueToBuffer(T value)
+            {
+                _queue.Enqueue(value);
+            }
+
+            protected override void ReplayBuffer(IObserver<T> observer)
+            {
+                foreach (var item in _queue)
+                    observer.OnNext(item);
+            }
+
+            protected override void Dispose(bool disposing)
             {
-                _isDisposed = true;
-                _observers = null;
+                base.Dispose(disposing);
+                _queue.Clear();
             }
         }
     }
-}
+}

File diff suppressed because it is too large
+ 1011 - 42
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs


Some files were not shown because too many files changed in this diff