Selaa lähdekoodia

Misc. improvements to subjects.

Bart De Smet 8 vuotta sitten
vanhempi
sitoutus
923704993c

+ 21 - 28
Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs

@@ -45,14 +45,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public override bool HasObservers
-        {
-            get
-            {
-                var observers = _observers;
-                return observers != null && observers.Data.Length > 0;
-            }
-        }
+        public override bool HasObservers => _observers?.Data.Length > 0;
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
@@ -108,8 +101,12 @@ namespace System.Reactive.Subjects
                     }
                 }
                 else
+                {
                     foreach (var o in os)
+                    {
                         o.OnCompleted();
+                    }
+                }
             }
         }
 
@@ -117,7 +114,7 @@ namespace System.Reactive.Subjects
         /// Notifies all subscribed observers about the exception.
         /// </summary>
         /// <param name="error">The exception to send to all observers.</param>
-        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="error"/> is <c>null</c>.</exception>
         public override void OnError(Exception error)
         {
             if (error == null)
@@ -138,8 +135,12 @@ namespace System.Reactive.Subjects
             }
 
             if (os != null)
+            {
                 foreach (var o in os)
+                {
                     o.OnError(error);
+                }
+            }
         }
 
         /// <summary>
@@ -169,7 +170,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="observer">Observer to subscribe to the subject.</param>
         /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is <c>null</c>.</exception>
         public override IDisposable Subscribe(IObserver<T> observer)
         {
             if (observer == null)
@@ -211,7 +212,7 @@ namespace System.Reactive.Subjects
             return Disposable.Empty;
         }
 
-        class Subscription : IDisposable
+        private sealed class Subscription : IDisposable
         {
             private readonly AsyncSubject<T> _subject;
             private IObserver<T> _observer;
@@ -276,13 +277,13 @@ namespace System.Reactive.Subjects
         /// Specifies a callback action that will be invoked when the subject completes.
         /// </summary>
         /// <param name="continuation">Callback action that will be invoked when the subject completes.</param>
-        /// <exception cref="ArgumentNullException"><paramref name="continuation"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="continuation"/> is <c>null</c>.</exception>
         public void OnCompleted(Action continuation)
         {
             if (continuation == null)
                 throw new ArgumentNullException(nameof(continuation));
 
-            OnCompleted(continuation, true);
+            OnCompleted(continuation, originalContext: true);
         }
 
         private void OnCompleted(Action continuation, bool originalContext)
@@ -293,7 +294,7 @@ namespace System.Reactive.Subjects
             this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext));
         }
 
-        class AwaitObserver : IObserver<T>
+        private sealed class AwaitObserver : IObserver<T>
         {
             private readonly SynchronizationContext _context;
             private readonly Action _callback;
@@ -302,23 +303,15 @@ namespace System.Reactive.Subjects
             {
                 if (originalContext)
                     _context = SynchronizationContext.Current;
-                
+
                 _callback = callback;
             }
 
-            public void OnCompleted()
-            {
-                InvokeOnOriginalContext();
-            }
+            public void OnCompleted() => InvokeOnOriginalContext();
 
-            public void OnError(Exception error)
-            {
-                InvokeOnOriginalContext();
-            }
+            public void OnError(Exception error) => InvokeOnOriginalContext();
 
-            public void OnNext(T value)
-            {
-            }
+            public void OnNext(T value) { }
 
             private void InvokeOnOriginalContext()
             {
@@ -355,8 +348,8 @@ namespace System.Reactive.Subjects
         {
             if (!_isStopped)
             {
-                var e = new ManualResetEvent(false);
-                OnCompleted(() => e.Set(), false);
+                var e = new ManualResetEvent(initialState: false);
+                OnCompleted(() => e.Set(), originalContext: false);
                 e.WaitOne();
             }
 

+ 14 - 11
Rx.NET/Source/src/System.Reactive/Subjects/BehaviorSubject.cs

@@ -44,14 +44,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public override bool HasObservers
-        {
-            get
-            {
-                var observers = _observers;
-                return observers != null && observers.Data.Length > 0;
-            }
-        }
+        public override bool HasObservers => _observers?.Data.Length > 0;
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
@@ -160,7 +153,9 @@ namespace System.Reactive.Subjects
             if (os != null)
             {
                 foreach (var o in os)
+                {
                     o.OnCompleted();
+                }
             }
         }
 
@@ -168,7 +163,7 @@ namespace System.Reactive.Subjects
         /// Notifies all subscribed observers about the exception.
         /// </summary>
         /// <param name="error">The exception to send to all observers.</param>
-        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="error"/> is <c>null</c>.</exception>
         public override void OnError(Exception error)
         {
             if (error == null)
@@ -191,7 +186,9 @@ namespace System.Reactive.Subjects
             if (os != null)
             {
                 foreach (var o in os)
+                {
                     o.OnError(error);
+                }
             }
         }
 
@@ -216,7 +213,9 @@ namespace System.Reactive.Subjects
             if (os != null)
             {
                 foreach (var o in os)
+                {
                     o.OnNext(value);
+                }
             }
         }
 
@@ -229,7 +228,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="observer">Observer to subscribe to the subject.</param>
         /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is <c>null</c>.</exception>
         public override IDisposable Subscribe(IObserver<T> observer)
         {
             if (observer == null)
@@ -252,9 +251,13 @@ namespace System.Reactive.Subjects
             }
 
             if (ex != null)
+            {
                 observer.OnError(ex);
+            }
             else
+            {
                 observer.OnCompleted();
+            }
 
             return Disposable.Empty;
         }
@@ -285,7 +288,7 @@ namespace System.Reactive.Subjects
 
         #endregion
 
-        class Subscription : IDisposable
+        private sealed class Subscription : IDisposable
         {
             private readonly BehaviorSubject<T> _subject;
             private IObserver<T> _observer;

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Subjects/ConnectableObservable.cs

@@ -50,7 +50,7 @@ namespace System.Reactive.Subjects
             }
         }
 
-        class Connection : IDisposable
+        private sealed class Connection : IDisposable
         {
             private readonly ConnectableObservable<TSource, TResult> _parent;
             private IDisposable _subscription;

+ 42 - 81
Rx.NET/Source/src/System.Reactive/Subjects/ReplaySubject.cs

@@ -41,7 +41,7 @@ namespace System.Reactive.Subjects
         /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified scheduler.
         /// </summary>
         /// <param name="scheduler">Scheduler the observers are invoked on.</param>
-        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
         public ReplaySubject(IScheduler scheduler)
         {
             _implementation = new ReplayByTime(scheduler);
@@ -77,7 +77,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
         /// <param name="scheduler">Scheduler the observers are invoked on.</param>
-        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
         /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
         public ReplaySubject(int bufferSize, IScheduler scheduler)
         {
@@ -92,7 +92,7 @@ namespace System.Reactive.Subjects
         /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified window.
         /// </summary>
         /// <param name="window">Maximum time length of the replay buffer.</param>
-        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
         public ReplaySubject(TimeSpan window)
         {
             _implementation = new ReplayByTime(window);
@@ -103,8 +103,8 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="window">Maximum time length of the replay buffer.</param>
         /// <param name="scheduler">Scheduler the observers are invoked on.</param>
-        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
-        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
         public ReplaySubject(TimeSpan window, IScheduler scheduler)
         {
             _implementation = new ReplayByTime(window, scheduler);
@@ -119,7 +119,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
         /// <param name="window">Maximum time length of the replay buffer.</param>
-        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
         public ReplaySubject(int bufferSize, TimeSpan window)
         {
             _implementation = new ReplayByTime(bufferSize, window);
@@ -131,8 +131,8 @@ namespace System.Reactive.Subjects
         /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
         /// <param name="window">Maximum time length of the replay buffer.</param>
         /// <param name="scheduler">Scheduler the observers are invoked on.</param>
-        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
-        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
         public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
         {
             _implementation = new ReplayByTime(bufferSize, window, scheduler);
@@ -147,18 +147,12 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public override bool HasObservers
-        {
-            get { return _implementation.HasObservers; }
-        }
+        public override bool HasObservers => _implementation.HasObservers;
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
         /// </summary>
-        public override bool IsDisposed
-        {
-            get { return _implementation.IsDisposed; }
-        }
+        public override bool IsDisposed => _implementation.IsDisposed;
 
         #endregion
 
@@ -170,16 +164,13 @@ namespace System.Reactive.Subjects
         /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
         /// </summary>
         /// <param name="value">The value to send to all observers.</param>
-        public override void OnNext(T value)
-        {
-            _implementation.OnNext(value);
-        }
+        public override void OnNext(T value) => _implementation.OnNext(value);
 
         /// <summary>
         /// Notifies all subscribed and future observers about the specified exception.
         /// </summary>
         /// <param name="error">The exception to send to all observers.</param>
-        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="error"/> is <c>null</c>.</exception>
         public override void OnError(Exception error)
         {
             if (error == null)
@@ -191,10 +182,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Notifies all subscribed and future observers about the end of the sequence.
         /// </summary>
-        public override void OnCompleted()
-        {
-            _implementation.OnCompleted();
-        }
+        public override void OnCompleted() => _implementation.OnCompleted();
 
         #endregion
 
@@ -205,7 +193,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="observer">Observer to subscribe to the subject.</param>
         /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is <c>null</c>.</exception>
         public override IDisposable Subscribe(IObserver<T> observer)
         {
             if (observer == null)
@@ -221,10 +209,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Releases all resources used by the current instance of the <see cref="ReplaySubject{T}"/> class and unsubscribe all observers.
         /// </summary>
-        public override void Dispose()
-        {
-            _implementation.Dispose();
-        }
+        public override void Dispose() => _implementation.Dispose();
 
         #endregion
 
@@ -248,14 +233,7 @@ namespace System.Reactive.Subjects
                 _error = null;
             }
 
-            public override bool HasObservers
-            {
-                get
-                {
-                    var observers = _observers;
-                    return observers != null && observers.Data.Length > 0;
-                }
-            }
+            public override bool HasObservers => _observers?.Data.Length > 0;
 
             public override bool IsDisposed
             {
@@ -282,14 +260,18 @@ namespace System.Reactive.Subjects
 
                         o = _observers.Data;
                         foreach (var observer in o)
+                        {
                             observer.OnNext(value);
+                        }
                     }
                 }
 
                 if (o != null)
                 {
                     foreach (var observer in o)
+                    {
                         observer.EnsureActive();
+                    }
                 }
             }
 
@@ -308,7 +290,9 @@ namespace System.Reactive.Subjects
 
                         o = _observers.Data;
                         foreach (var observer in o)
+                        {
                             observer.OnError(error);
+                        }
 
                         _observers = ImmutableList<IScheduledObserver<T>>.Empty;
                     }
@@ -317,7 +301,9 @@ namespace System.Reactive.Subjects
                 if (o != null)
                 {
                     foreach (var observer in o)
+                    {
                         observer.EnsureActive();
+                    }
                 }
             }
 
@@ -335,7 +321,9 @@ namespace System.Reactive.Subjects
 
                         o = _observers.Data;
                         foreach (var observer in o)
+                        {
                             observer.OnCompleted();
+                        }
 
                         _observers = ImmutableList<IScheduledObserver<T>>.Empty;
                     }
@@ -344,7 +332,9 @@ namespace System.Reactive.Subjects
                 if (o != null)
                 {
                     foreach (var observer in o)
+                    {
                         observer.EnsureActive();
+                    }
                 }
             }
 
@@ -443,46 +433,6 @@ namespace System.Reactive.Subjects
                 }
             }
 
-#if NOTYET // TODO: Expose internal notifications similar to BehaviorSubject<T>.TryGetValue?
-
-            public bool TryGetNotifications(out IList<Notification<T>> notifications)
-            {
-                lock (_gate)
-                {
-                    if (_isDisposed)
-                    {
-                        notifications = null;
-                        return false;
-                    }
-                    else
-                    {
-                        var res = new List<Notification<T>>();
-
-                        var materializer = Observer.Create<T>(
-                            x => res.Add(Notification.CreateOnNext(x)),
-                            ex => res.Add(Notification.CreateOnError<T>(ex)),
-                            () => res.Add(Notification.CreateOnCompleted<T>())
-                        );
-
-                        Replay(materializer);
-
-                        if (_error != null)
-                        {
-                            materializer.OnError(_error);
-                        }
-                        else if (_isStopped)
-                        {
-                            materializer.OnCompleted();
-                        }
-
-                        notifications = res;
-                        return true;
-                    }
-                }
-            }
-
-#endif
-
             private sealed class Subscription : IDisposable
             {
                 private readonly ReplayBase _subject;
@@ -580,7 +530,9 @@ namespace System.Reactive.Subjects
                 var n = _queue.Count;
 
                 foreach (var item in _queue)
+                {
                     observer.OnNext(item.Value);
+                }
 
                 return n;
             }
@@ -590,14 +542,19 @@ namespace System.Reactive.Subjects
                 var now = _stopwatch.Elapsed;
 
                 while (_queue.Count > _bufferSize)
+                {
                     _queue.Dequeue();
+                }
+
                 while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
+                {
                     _queue.Dequeue();
+                }
             }
         }
 
         //
-        // Below are the non-time based implementations. 
+        // Below are the non-time based implementations.
         // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
         // The ReplayOne implementation also removes the need to even have a queue.
         //
@@ -652,7 +609,9 @@ namespace System.Reactive.Subjects
             protected override void Trim()
             {
                 while (_queue.Count > _bufferSize)
+                {
                     _queue.Dequeue();
+                }
             }
         }
 
@@ -703,7 +662,9 @@ namespace System.Reactive.Subjects
                 var n = _queue.Count;
 
                 foreach (var item in _queue)
+                {
                     observer.OnNext(item);
+                }
 
                 return n;
             }
@@ -719,7 +680,7 @@ namespace System.Reactive.Subjects
     /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
     /// </summary>
     /// <typeparam name="T">Type of the elements processed by the observer.</typeparam>
-    class FastImmediateObserver<T> : IScheduledObserver<T>
+    internal sealed class FastImmediateObserver<T> : IScheduledObserver<T>
     {
         /// <summary>
         /// Gate to control ownership transfer and protect data structures.
@@ -985,4 +946,4 @@ namespace System.Reactive.Subjects
             return Interlocked.Exchange(ref _observer, NopObserver<T>.Instance);
         }
     }
-}
+}

+ 10 - 16
Rx.NET/Source/src/System.Reactive/Subjects/Subject.Extensions.cs

@@ -20,7 +20,7 @@ namespace System.Reactive.Subjects
         /// <param name="observer">The observer used to send messages to the subject.</param>
         /// <param name="observable">The observable used to subscribe to messages sent from the subject.</param>
         /// <returns>Subject implemented using the given observer and observable.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="observable"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="observable"/> is <c>null</c>.</exception>
         public static ISubject<TSource, TResult> Create<TSource, TResult>(IObserver<TSource> observer, IObservable<TResult> observable)
         {
             if (observer == null)
@@ -38,7 +38,7 @@ namespace System.Reactive.Subjects
         /// <param name="observer">The observer used to send messages to the subject.</param>
         /// <param name="observable">The observable used to subscribe to messages sent from the subject.</param>
         /// <returns>Subject implemented using the given observer and observable.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="observable"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="observable"/> is <c>null</c>.</exception>
         public static ISubject<T> Create<T>(IObserver<T> observer, IObservable<T> observable)
         {
             if (observer == null)
@@ -56,7 +56,7 @@ namespace System.Reactive.Subjects
         /// <typeparam name="TResult">The type of the elements produced by the subject.</typeparam>
         /// <param name="subject">The subject to synchronize.</param>
         /// <returns>Subject whose messages are synchronized.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="subject"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="subject"/> is <c>null</c>.</exception>
         public static ISubject<TSource, TResult> Synchronize<TSource, TResult>(ISubject<TSource, TResult> subject)
         {
             if (subject == null)
@@ -71,7 +71,7 @@ namespace System.Reactive.Subjects
         /// <typeparam name="TSource">The type of the elements received and produced by the subject.</typeparam>
         /// <param name="subject">The subject to synchronize.</param>
         /// <returns>Subject whose messages are synchronized.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="subject"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="subject"/> is <c>null</c>.</exception>
         public static ISubject<TSource> Synchronize<TSource>(ISubject<TSource> subject)
         {
             if (subject == null)
@@ -88,7 +88,7 @@ namespace System.Reactive.Subjects
         /// <param name="subject">The subject to synchronize.</param>
         /// <param name="scheduler">Scheduler to notify observers on.</param>
         /// <returns>Subject whose messages are synchronized and whose observers are notified on the given scheduler.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="subject"/> or <paramref name="scheduler"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="subject"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
         public static ISubject<TSource, TResult> Synchronize<TSource, TResult>(ISubject<TSource, TResult> subject, IScheduler scheduler)
         {
             if (subject == null)
@@ -106,7 +106,7 @@ namespace System.Reactive.Subjects
         /// <param name="subject">The subject to synchronize.</param>
         /// <param name="scheduler">Scheduler to notify observers on.</param>
         /// <returns>Subject whose messages are synchronized and whose observers are notified on the given scheduler.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="subject"/> or <paramref name="scheduler"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="subject"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
         public static ISubject<TSource> Synchronize<TSource>(ISubject<TSource> subject, IScheduler scheduler)
         {
             if (subject == null)
@@ -117,7 +117,7 @@ namespace System.Reactive.Subjects
             return new AnonymousSubject<TSource>(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
         }
 
-        class AnonymousSubject<T, U> : ISubject<T, U>
+        private class AnonymousSubject<T, U> : ISubject<T, U>
         {
             private readonly IObserver<T> _observer;
             private readonly IObservable<U> _observable;
@@ -128,10 +128,7 @@ namespace System.Reactive.Subjects
                 _observable = observable;
             }
 
-            public void OnCompleted()
-            {
-                _observer.OnCompleted();
-            }
+            public void OnCompleted() => _observer.OnCompleted();
 
             public void OnError(Exception error)
             {
@@ -141,10 +138,7 @@ namespace System.Reactive.Subjects
                 _observer.OnError(error);
             }
 
-            public void OnNext(T value)
-            {
-                _observer.OnNext(value);
-            }
+            public void OnNext(T value) => _observer.OnNext(value);
 
             public IDisposable Subscribe(IObserver<U> observer)
             {
@@ -158,7 +152,7 @@ namespace System.Reactive.Subjects
             }
         }
 
-        class AnonymousSubject<T> : AnonymousSubject<T, T>, ISubject<T>
+        private sealed class AnonymousSubject<T> : AnonymousSubject<T, T>, ISubject<T>
         {
             public AnonymousSubject(IObserver<T> observer, IObservable<T> observable)
                 : base(observer, observable)

+ 1 - 187
Rx.NET/Source/src/System.Reactive/Subjects/Subject.cs

@@ -2,7 +2,6 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
-#if !NO_PERF
 using System.Reactive.Disposables;
 using System.Threading;
 
@@ -170,7 +169,7 @@ namespace System.Reactive.Subjects
             return new Subscription(this, observer);
         }
 
-        class Subscription : IDisposable
+        private sealed class Subscription : IDisposable
         {
             private Subject<T> _subject;
             private IObserver<T> _observer;
@@ -236,188 +235,3 @@ namespace System.Reactive.Subjects
         #endregion
     }
 }
-#else
-using System.Reactive.Disposables;
-using System.Threading;
-
-namespace System.Reactive.Subjects
-{
-    /// <summary>
-    /// Represents an object that is both an observable sequence as well as an observer.
-    /// Each notification is broadcasted to all subscribed observers.
-    /// </summary>
-    /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
-    public sealed class Subject<T> : ISubject<T>, IDisposable
-    {
-        bool isDisposed;
-        bool isStopped;
-        ImmutableList<IObserver<T>> observers;
-        object gate = new object();
-        Exception exception;
-
-        /// <summary>
-        /// Creates a subject.
-        /// </summary>
-        public Subject()
-        {
-            observers = new ImmutableList<IObserver<T>>();
-        }
-
-        /// <summary>
-        /// Notifies all subscribed observers about the end of the sequence.
-        /// </summary>
-        public void OnCompleted()
-        {
-            var os = default(IObserver<T>[]);
-            lock (gate)
-            {
-                CheckDisposed();
-
-                if (!isStopped)
-                {
-                    os = observers.Data;
-                    observers = new ImmutableList<IObserver<T>>();
-                    isStopped = true;
-                }
-            }
-
-            if (os != null)
-                foreach (var o in os)
-                    o.OnCompleted();
-        }
-
-        /// <summary>
-        /// Notifies all subscribed observers with the exception.
-        /// </summary>
-        /// <param name="error">The exception to send to all subscribed observers.</param>
-        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
-        public void OnError(Exception error)
-        {
-            if (error == null)
-                throw new ArgumentNullException(nameof(error));
-
-            var os = default(IObserver<T>[]);
-            lock (gate)
-            {
-                CheckDisposed();
-
-                if (!isStopped)
-                {
-                    os = observers.Data;
-                    observers = new ImmutableList<IObserver<T>>();
-                    isStopped = true;
-                    exception = error;
-                }
-            }
-
-            if (os != null)
-                foreach (var o in os)
-                    o.OnError(error);
-        }
-
-        /// <summary>
-        /// Notifies all subscribed observers with the value.
-        /// </summary>
-        /// <param name="value">The value to send to all subscribed observers.</param>
-        public void OnNext(T value)
-        {
-            var os = default(IObserver<T>[]);
-            lock (gate)
-            {
-                CheckDisposed();
-
-                if (!isStopped)
-                {
-                    os = observers.Data;
-                }
-            }
-
-            if (os != null)
-                foreach (var o in os)
-                    o.OnNext(value);
-        }
-
-        /// <summary>
-        /// Subscribes an observer to the subject.
-        /// </summary>
-        /// <param name="observer">Observer to subscribe to the subject.</param>
-        /// <remarks>IDisposable object that can be used to unsubscribe the observer from the subject.</remarks>
-        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
-        public IDisposable Subscribe(IObserver<T> observer)
-        {
-            if (observer == null)
-                throw new ArgumentNullException(nameof(observer));
-
-            lock (gate)
-            {
-                CheckDisposed();
-
-                if (!isStopped)
-                {
-                    observers = observers.Add(observer);
-                    return new Subscription(this, observer);
-                }
-                else if (exception != null)
-                {
-                    observer.OnError(exception);
-                    return Disposable.Empty;
-                }
-                else
-                {
-                    observer.OnCompleted();
-                    return Disposable.Empty;
-                }
-            }
-        }
-
-        void Unsubscribe(IObserver<T> observer)
-        {
-            lock (gate)
-            {
-                if (observers != null)
-                    observers = observers.Remove(observer);
-            }
-        }
-
-        class Subscription : IDisposable
-        {
-            Subject<T> subject;
-            IObserver<T> observer;
-
-            public Subscription(Subject<T> subject, IObserver<T> observer)
-            {
-                this.subject = subject;
-                this.observer = observer;
-            }
-
-            public void Dispose()
-            {
-                var o = Interlocked.Exchange<IObserver<T>>(ref observer, null);
-                if (o != null)
-                {
-                    subject.Unsubscribe(o);
-                    subject = null;
-                }
-            }
-        }
-
-        void CheckDisposed()
-        {
-            if (isDisposed)
-                throw new ObjectDisposedException(string.Empty);
-        }
-
-        /// <summary>
-        /// Unsubscribe all observers and release resources.
-        /// </summary>
-        public void Dispose()
-        {
-            lock (gate)
-            {
-                isDisposed = true;
-                observers = null;
-            }
-        }
-    }
-}
-#endif

+ 3 - 3
Rx.NET/Source/src/System.Reactive/Subjects/SubjectBase.cs

@@ -5,7 +5,7 @@
 namespace System.Reactive.Subjects
 {
     /// <summary>
-    /// Base calss for objects that are both an observable sequence as well as an observer.
+    /// Base class for objects that are both an observable sequence as well as an observer.
     /// </summary>
     /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
     public abstract class SubjectBase<T> : ISubject<T>, IDisposable
@@ -34,7 +34,7 @@ namespace System.Reactive.Subjects
         /// Notifies all subscribed observers about the specified exception.
         /// </summary>
         /// <param name="error">The exception to send to all currently subscribed observers.</param>
-        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="error"/> is <c>null</c>.</exception>
         public abstract void OnError(Exception error);
 
         /// <summary>
@@ -48,7 +48,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="observer">Observer to subscribe to the subject.</param>
         /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is <c>null</c>.</exception>
         public abstract IDisposable Subscribe(IObserver<T> observer);
     }
 }