Sfoglia il codice sorgente

Some enhancements to subjects.

Bart De Smet 10 anni fa
parent
commit
f065eea49f

+ 57 - 7
Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/AsyncSubject.cs

@@ -12,11 +12,13 @@ namespace System.Reactive.Subjects
     /// The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
     /// </summary>
     /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
-    public sealed class AsyncSubject<T> : ISubject<T>, IDisposable
+    public sealed class AsyncSubject<T> : SubjectBase<T>, IDisposable
 #if HAS_AWAIT
         , INotifyCompletion
 #endif
     {
+        #region Fields
+
         private readonly object _gate = new object();
 
         private ImmutableList<IObserver<T>> _observers;
@@ -26,6 +28,10 @@ namespace System.Reactive.Subjects
         private bool _hasValue;
         private Exception _exception;
 
+        #endregion
+
+        #region Constructors
+
         /// <summary>
         /// Creates a subject that can only receive one value and that value is cached for all future observations.
         /// </summary>
@@ -34,10 +40,14 @@ namespace System.Reactive.Subjects
             _observers = ImmutableList<IObserver<T>>.Empty;
         }
 
+        #endregion
+
+        #region Properties
+
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public bool HasObservers
+        public override bool HasObservers
         {
             get
             {
@@ -46,10 +56,30 @@ namespace System.Reactive.Subjects
             }
         }
 
+        /// <summary>
+        /// Indicates whether the subject has been disposed.
+        /// </summary>
+        public override bool IsDisposed
+        {
+            get
+            {
+                lock (_gate)
+                {
+                    return _isDisposed;
+                }
+            }
+        }
+
+        #endregion
+
+        #region Methods
+
+        #region IObserver<T> implementation
+
         /// <summary>
         /// Notifies all subscribed observers about the end of the sequence, also causing the last received value to be sent out (if any).
         /// </summary>
-        public void OnCompleted()
+        public override void OnCompleted()
         {
             var os = default(IObserver<T>[]);
 
@@ -90,7 +120,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="error">The exception to send to all observers.</param>
         /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
-        public void OnError(Exception error)
+        public override void OnError(Exception error)
         {
             if (error == null)
                 throw new ArgumentNullException("error");
@@ -118,7 +148,7 @@ namespace System.Reactive.Subjects
         /// Sends a value to the subject. The last value received before successful termination will be sent to all subscribed and future observers.
         /// </summary>
         /// <param name="value">The value to store in the subject.</param>
-        public void OnNext(T value)
+        public override void OnNext(T value)
         {
             lock (_gate)
             {
@@ -132,13 +162,17 @@ namespace System.Reactive.Subjects
             }
         }
 
+        #endregion
+
+        #region IObservable<T> implementation
+
         /// <summary>
         /// Subscribes an observer to the subject.
         /// </summary>
         /// <param name="observer">Observer to subscribe to the subject.</param>
         /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
-        public IDisposable Subscribe(IObserver<T> observer)
+        public override IDisposable Subscribe(IObserver<T> observer)
         {
             if (observer == null)
                 throw new ArgumentNullException("observer");
@@ -163,14 +197,18 @@ namespace System.Reactive.Subjects
             }
 
             if (ex != null)
+            {
                 observer.OnError(ex);
+            }
             else if (hv)
             {
                 observer.OnNext(v);
                 observer.OnCompleted();
             }
             else
+            {
                 observer.OnCompleted();
+            }
 
             return Disposable.Empty;
         }
@@ -202,6 +240,10 @@ namespace System.Reactive.Subjects
             }
         }
 
+        #endregion
+
+        #region IDisposable implementation
+
         void CheckDisposed()
         {
             if (_isDisposed)
@@ -211,7 +253,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Unsubscribe all observers and release resources.
         /// </summary>
-        public void Dispose()
+        public override void Dispose()
         {
             lock (_gate)
             {
@@ -222,6 +264,10 @@ namespace System.Reactive.Subjects
             }
         }
 
+        #endregion
+
+        #region Await support
+
 #if HAS_AWAIT
         /// <summary>
         /// Gets an awaitable object for the current AsyncSubject.
@@ -342,5 +388,9 @@ namespace System.Reactive.Subjects
 
             return _value;
         }
+
+        #endregion
+
+        #endregion
     }
 }

+ 83 - 7
Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/BehaviorSubject.cs

@@ -9,8 +9,10 @@ namespace System.Reactive.Subjects
     /// Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
     /// </summary>
     /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
-    public sealed class BehaviorSubject<T> : ISubject<T>, IDisposable
+    public sealed class BehaviorSubject<T> : SubjectBase<T>, IDisposable
     {
+        #region Fields
+
         private readonly object _gate = new object();
 
         private ImmutableList<IObserver<T>> _observers;
@@ -19,6 +21,10 @@ namespace System.Reactive.Subjects
         private Exception _exception;
         private bool _isDisposed;
 
+        #endregion
+
+        #region Constructors
+
         /// <summary>
         /// Initializes a new instance of the <see cref="System.Reactive.Subjects.BehaviorSubject&lt;T&gt;"/> class which creates a subject that caches its last value and starts with the specified value.
         /// </summary>
@@ -29,10 +35,14 @@ namespace System.Reactive.Subjects
             _observers = ImmutableList<IObserver<T>>.Empty;
         }
 
+        #endregion
+
+        #region Properties
+
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public bool HasObservers
+        public override bool HasObservers
         {
             get
             {
@@ -41,6 +51,20 @@ namespace System.Reactive.Subjects
             }
         }
 
+        /// <summary>
+        /// Indicates whether the subject has been disposed.
+        /// </summary>
+        public override bool IsDisposed
+        {
+            get
+            {
+                lock (_gate)
+                {
+                    return _isDisposed;
+                }
+            }
+        }
+
         /// <summary>
         /// Gets the current value or throws an exception.
         /// </summary>
@@ -73,10 +97,50 @@ namespace System.Reactive.Subjects
             }
         }
 
+        #endregion
+
+        #region Methods
+
+        /// <summary>
+        /// Tries to get the current value or throws an exception.
+        /// </summary>
+        /// <param name="value">The initial value passed to the constructor until <see cref="OnNext"/> is called; after which, the last value passed to <see cref="OnNext"/>.</param>
+        /// <returns>true if a value is available; false if the subject was disposed.</returns>
+        /// <remarks>
+        /// <para>The value returned from <see cref="TryGetValue"/> is frozen after <see cref="OnCompleted"/> is called.</para>
+        /// <para>After <see cref="OnError"/> is called, <see cref="TryGetValue"/> always throws the specified exception.</para>
+        /// <alert type="caller">
+        /// Calling <see cref="TryGetValue"/> is a thread-safe operation, though there's a potential race condition when <see cref="OnNext"/> or <see cref="OnError"/> are being invoked concurrently.
+        /// In some cases, it may be necessary for a caller to use external synchronization to avoid race conditions.
+        /// </alert>
+        /// </remarks>
+        public bool TryGetValue(out T value)
+        {
+            lock (_gate)
+            {
+                if (_isDisposed)
+                {
+                    value = default(T);
+                    return false;
+                }
+                else if (_exception != null)
+                {
+                    throw _exception;
+                }
+                else
+                {
+                    value = _value;
+                    return true;
+                }
+            }
+        }
+
+        #region IObserver<T> implementation
+
         /// <summary>
         /// Notifies all subscribed observers about the end of the sequence.
         /// </summary>
-        public void OnCompleted()
+        public override void OnCompleted()
         {
             var os = default(IObserver<T>[]);
             lock (_gate)
@@ -103,7 +167,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="error">The exception to send to all observers.</param>
         /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
-        public void OnError(Exception error)
+        public override void OnError(Exception error)
         {
             if (error == null)
                 throw new ArgumentNullException("error");
@@ -133,7 +197,7 @@ namespace System.Reactive.Subjects
         /// Notifies all subscribed observers about the arrival of the specified element in the sequence.
         /// </summary>
         /// <param name="value">The value to send to all observers.</param>
-        public void OnNext(T value)
+        public override void OnNext(T value)
         {
             var os = default(IObserver<T>[]);
             lock (_gate)
@@ -154,13 +218,17 @@ namespace System.Reactive.Subjects
             }
         }
 
+        #endregion
+
+        #region IObservable<T> implementation
+
         /// <summary>
         /// Subscribes an observer to the subject.
         /// </summary>
         /// <param name="observer">Observer to subscribe to the subject.</param>
         /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
-        public IDisposable Subscribe(IObserver<T> observer)
+        public override IDisposable Subscribe(IObserver<T> observer)
         {
             if (observer == null)
                 throw new ArgumentNullException("observer");
@@ -189,10 +257,14 @@ namespace System.Reactive.Subjects
             return Disposable.Empty;
         }
 
+        #endregion
+
+        #region IDisposable implementation
+
         /// <summary>
         /// Unsubscribe all observers and release resources.
         /// </summary>
-        public void Dispose()
+        public override void Dispose()
         {
             lock (_gate)
             {
@@ -209,6 +281,8 @@ namespace System.Reactive.Subjects
                 throw new ObjectDisposedException(string.Empty);
         }
 
+        #endregion
+
         class Subscription : IDisposable
         {
             private readonly BehaviorSubject<T> _subject;
@@ -235,5 +309,7 @@ namespace System.Reactive.Subjects
                 }
             }
         }
+
+        #endregion
     }
 }

+ 95 - 34
Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs

@@ -2,6 +2,7 @@
 
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 using System.Threading;
 
 namespace System.Reactive.Subjects
@@ -11,14 +12,14 @@ namespace System.Reactive.Subjects
     /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
     /// </summary>
     /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
-    public sealed class ReplaySubject<T> : ISubject<T>, IDisposable
+    public sealed class ReplaySubject<T> : SubjectBase<T>, IDisposable
     {
         #region Fields
 
         /// <summary>
         /// Underlying optimized implementation of the replay subject.
         /// </summary>
-        private readonly IReplaySubjectImplementation _implementation;
+        private readonly SubjectBase<T> _implementation;
 
         #endregion
 
@@ -144,22 +145,30 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public bool HasObservers
+        public override bool HasObservers
         {
             get { return _implementation.HasObservers; }
         }
 
+        /// <summary>
+        /// Indicates whether the subject has been disposed.
+        /// </summary>
+        public override bool IsDisposed
+        {
+            get { return _implementation.IsDisposed; }
+        }
+
         #endregion
 
         #region Methods
 
-        #region Observer implementation
+        #region IObserver<T> implementation
 
         /// <summary>
         /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
         /// </summary>
         /// <param name="value">The value to send to all observers.</param>
-        public void OnNext(T value)
+        public override void OnNext(T value)
         {
             _implementation.OnNext(value);
         }
@@ -169,22 +178,25 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="error">The exception to send to all observers.</param>
         /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
-        public void OnError(Exception error)
+        public override void OnError(Exception error)
         {
+            if (error == null)
+                throw new ArgumentNullException("error");
+
             _implementation.OnError(error);
         }
 
         /// <summary>
         /// Notifies all subscribed and future observers about the end of the sequence.
         /// </summary>
-        public void OnCompleted()
+        public override void OnCompleted()
         {
             _implementation.OnCompleted();
         }
 
         #endregion
 
-        #region IObservable implementation
+        #region IObservable<T> implementation
 
         /// <summary>
         /// Subscribes an observer to the subject.
@@ -192,8 +204,11 @@ namespace System.Reactive.Subjects
         /// <param name="observer">Observer to subscribe to the subject.</param>
         /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
-        public IDisposable Subscribe(IObserver<T> observer)
+        public override IDisposable Subscribe(IObserver<T> observer)
         {
+            if (observer == null)
+                throw new ArgumentNullException("observer");
+
             return _implementation.Subscribe(observer);
         }
 
@@ -204,7 +219,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
         /// </summary>
-        public void Dispose()
+        public override void Dispose()
         {
             _implementation.Dispose();
         }
@@ -213,12 +228,7 @@ namespace System.Reactive.Subjects
 
         #endregion
 
-        private interface IReplaySubjectImplementation : ISubject<T>, IDisposable
-        {
-            bool HasObservers { get; }
-        }
-
-        private abstract class ReplayBase : IReplaySubjectImplementation
+        private abstract class ReplayBase : SubjectBase<T>
         {
             private readonly object _gate = new object();
 
@@ -236,7 +246,7 @@ namespace System.Reactive.Subjects
                 _error = null;
             }
 
-            public bool HasObservers
+            public override bool HasObservers
             {
                 get
                 {
@@ -245,7 +255,18 @@ namespace System.Reactive.Subjects
                 }
             }
 
-            public void OnNext(T value)
+            public override bool IsDisposed
+            {
+                get
+                {
+                    lock (_gate)
+                    {
+                        return _isDisposed;
+                    }
+                }
+            }
+
+            public override void OnNext(T value)
             {
                 var o = default(IScheduledObserver<T>[]);
                 lock (_gate)
@@ -270,11 +291,8 @@ namespace System.Reactive.Subjects
                 }
             }
 
-            public void OnError(Exception error)
+            public override void OnError(Exception error)
             {
-                if (error == null)
-                    throw new ArgumentNullException("error");
-
                 var o = default(IScheduledObserver<T>[]);
                 lock (_gate)
                 {
@@ -301,7 +319,7 @@ namespace System.Reactive.Subjects
                 }
             }
 
-            public void OnCompleted()
+            public override void OnCompleted()
             {
                 var o = default(IScheduledObserver<T>[]);
                 lock (_gate)
@@ -328,16 +346,14 @@ namespace System.Reactive.Subjects
                 }
             }
 
-            public IDisposable Subscribe(IObserver<T> observer)
+            public override IDisposable Subscribe(IObserver<T> observer)
             {
-                if (observer == null)
-                    throw new ArgumentNullException("observer");
-
                 var so = CreateScheduledObserver(observer);
 
                 var n = 0;
 
-                var subscription = new Subscription(this, so);
+                var subscription = Disposable.Empty;
+
                 lock (_gate)
                 {
                     CheckDisposed();
@@ -362,7 +378,6 @@ namespace System.Reactive.Subjects
                     // reasons with v1.x.
                     //
                     Trim();
-                    _observers = _observers.Add(so);
 
                     n = Replay(so);
 
@@ -376,6 +391,12 @@ namespace System.Reactive.Subjects
                         n++;
                         so.OnCompleted();
                     }
+
+                    if (!_isStopped)
+                    {
+                        subscription = new Subscription(this, so);
+                        _observers = _observers.Add(so);
+                    }
                 }
 
                 so.EnsureActive(n);
@@ -383,7 +404,7 @@ namespace System.Reactive.Subjects
                 return subscription;
             }
 
-            public void Dispose()
+            public override void Dispose()
             {
                 lock (_gate)
                 {
@@ -420,6 +441,46 @@ namespace System.Reactive.Subjects
                 }
             }
 
+#if NOTYET // TODO: Expose internal notifications similar to BehaviorSubject<T>.TryGetValue?
+
+            public bool TryGetNotifications(out IList<Notification<T>> notifications)
+            {
+                lock (_gate)
+                {
+                    if (_isDisposed)
+                    {
+                        notifications = null;
+                        return false;
+                    }
+                    else
+                    {
+                        var res = new List<Notification<T>>();
+
+                        var materializer = Observer.Create<T>(
+                            x => res.Add(Notification.CreateOnNext(x)),
+                            ex => res.Add(Notification.CreateOnError<T>(ex)),
+                            () => res.Add(Notification.CreateOnCompleted<T>())
+                        );
+
+                        Replay(materializer);
+
+                        if (_error != null)
+                        {
+                            materializer.OnError(_error);
+                        }
+                        else if (_isStopped)
+                        {
+                            materializer.OnCompleted();
+                        }
+
+                        notifications = res;
+                        return true;
+                    }
+                }
+            }
+
+#endif
+
             private sealed class Subscription : IDisposable
             {
                 private readonly ReplayBase _subject;
@@ -539,7 +600,7 @@ namespace System.Reactive.Subjects
         // The ReplayOne implementation also removes the need to even have a queue.
         //
 
-        private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
+        private sealed class ReplayOne : ReplayBufferBase
         {
             private bool _hasValue;
             private T _value;
@@ -576,7 +637,7 @@ namespace System.Reactive.Subjects
             }
         }
 
-        private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
+        private sealed class ReplayMany : ReplayManyBase
         {
             private readonly int _bufferSize;
 
@@ -593,7 +654,7 @@ namespace System.Reactive.Subjects
             }
         }
 
-        private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
+        private sealed class ReplayAll : ReplayManyBase
         {
             public ReplayAll()
                 : base(0)
@@ -620,7 +681,7 @@ namespace System.Reactive.Subjects
             }
         }
 
-        private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
+        private abstract class ReplayManyBase : ReplayBufferBase
         {
             protected readonly Queue<T> _queue;
 

+ 46 - 7
Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/Subject.cs

@@ -11,10 +11,16 @@ namespace System.Reactive.Subjects
     /// Each notification is broadcasted to all subscribed observers.
     /// </summary>
     /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
-    public sealed class Subject<T> : ISubject<T>, IDisposable
+    public sealed class Subject<T> : SubjectBase<T>, IDisposable
     {
+        #region Fields
+
         private volatile IObserver<T> _observer;
 
+        #endregion
+
+        #region Constructors
+
         /// <summary>
         /// Creates a subject.
         /// </summary>
@@ -23,10 +29,14 @@ namespace System.Reactive.Subjects
             _observer = NopObserver<T>.Instance;
         }
 
+        #endregion
+
+        #region Properties
+
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public bool HasObservers
+        public override bool HasObservers
         {
             get
             {
@@ -34,10 +44,27 @@ namespace System.Reactive.Subjects
             }
         }
 
+        /// <summary>
+        /// Indicates whether the subject has been disposed.
+        /// </summary>
+        public override bool IsDisposed
+        {
+            get
+            {
+                return _observer is DisposedObserver<T>;
+            }
+        }
+
+        #endregion
+
+        #region Methods
+
+        #region IObserver<T> implementation
+
         /// <summary>
         /// Notifies all subscribed observers about the end of the sequence.
         /// </summary>
-        public void OnCompleted()
+        public override void OnCompleted()
         {
             var oldObserver = default(IObserver<T>);
             var newObserver = DoneObserver<T>.Completed;
@@ -60,7 +87,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         /// <param name="error">The exception to send to all currently subscribed observers.</param>
         /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
-        public void OnError(Exception error)
+        public override void OnError(Exception error)
         {
             if (error == null)
                 throw new ArgumentNullException("error");
@@ -85,18 +112,22 @@ namespace System.Reactive.Subjects
         /// Notifies all subscribed observers about the arrival of the specified element in the sequence.
         /// </summary>
         /// <param name="value">The value to send to all currently subscribed observers.</param>
-        public void OnNext(T value)
+        public override void OnNext(T value)
         {
             _observer.OnNext(value);
         }
 
+        #endregion
+
+        #region IObservable<T> implementation
+
         /// <summary>
         /// Subscribes an observer to the subject.
         /// </summary>
         /// <param name="observer">Observer to subscribe to the subject.</param>
         /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
-        public IDisposable Subscribe(IObserver<T> observer)
+        public override IDisposable Subscribe(IObserver<T> observer)
         {
             if (observer == null)
                 throw new ArgumentNullException("observer");
@@ -200,13 +231,21 @@ namespace System.Reactive.Subjects
 #pragma warning restore 0420
         }
 
+        #endregion
+
+        #region IDisposable implementation
+
         /// <summary>
         /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.Subject&lt;T&gt;"/> class and unsubscribes all observers.
         /// </summary>
-        public void Dispose()
+        public override void Dispose()
         {
             _observer = DisposedObserver<T>.Instance;
         }
+
+        #endregion
+
+        #endregion
     }
 }
 #else

+ 55 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/SubjectBase.cs

@@ -0,0 +1,55 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace System.Reactive.Subjects
+{
+    /// <summary>
+    /// Base calss for objects that are both an observable sequence as well as an observer.
+    /// </summary>
+    /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
+    public abstract class SubjectBase<T> : ISubject<T>, IDisposable
+    {
+        /// <summary>
+        /// Indicates whether the subject has observers subscribed to it.
+        /// </summary>
+        public abstract bool HasObservers { get; }
+
+        /// <summary>
+        /// Indicates whether the subject has been disposed.
+        /// </summary>
+        public abstract bool IsDisposed { get; }
+
+        /// <summary>
+        /// Releases all resources used by the current instance of the subject and unsubscribes all observers.
+        /// </summary>
+        public abstract void Dispose();
+
+        /// <summary>
+        /// Notifies all subscribed observers about the end of the sequence.
+        /// </summary>
+        public abstract void OnCompleted();
+
+        /// <summary>
+        /// Notifies all subscribed observers about the specified exception.
+        /// </summary>
+        /// <param name="error">The exception to send to all currently subscribed observers.</param>
+        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
+        public abstract void OnError(Exception error);
+
+        /// <summary>
+        /// Notifies all subscribed observers about the arrival of the specified element in the sequence.
+        /// </summary>
+        /// <param name="value">The value to send to all currently subscribed observers.</param>
+        public abstract void OnNext(T value);
+
+        /// <summary>
+        /// Subscribes an observer to the subject.
+        /// </summary>
+        /// <param name="observer">Observer to subscribe to the subject.</param>
+        /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
+        public abstract IDisposable Subscribe(IObserver<T> observer);
+    }
+}

+ 1 - 0
Rx.NET/Source/System.Reactive.Linq/System.Reactive.Linq.csproj

@@ -204,6 +204,7 @@
     <Compile Include="Reactive\Linq\Observable.Time.cs" />
     <Compile Include="Reactive\Internal\PushPullAdapter.cs" />
     <Compile Include="Reactive\Subjects\ReplaySubject.cs" />
+    <Compile Include="Reactive\Subjects\SubjectBase.cs" />
     <Compile Include="Reactive\Threading\Tasks\NamespaceDoc.cs" />
     <Compile Include="Reactive\Threading\Tasks\TaskObservableExtensions.cs" />
     <Compile Include="Reactive\TimeInterval.cs" />

+ 10 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/AsyncSubjectTest.cs

@@ -461,15 +461,19 @@ namespace ReactiveTests.Tests
         {
             var s = new AsyncSubject<int>();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             var d = s.Subscribe(_ => { });
             Assert.IsTrue(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
 
             d.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]
@@ -477,15 +481,19 @@ namespace ReactiveTests.Tests
         {
             var s = new AsyncSubject<int>();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             var d = s.Subscribe(_ => { });
             Assert.IsTrue(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             d.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]
@@ -493,9 +501,11 @@ namespace ReactiveTests.Tests
         {
             var s = new AsyncSubject<int>();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]

+ 67 - 5
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/BehaviorSubjectTest.cs

@@ -363,15 +363,19 @@ namespace ReactiveTests.Tests
         {
             var s = new BehaviorSubject<int>(42);
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             var d = s.Subscribe(_ => { });
             Assert.IsTrue(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
 
             d.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]
@@ -379,15 +383,19 @@ namespace ReactiveTests.Tests
         {
             var s = new BehaviorSubject<int>(42);
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             var d = s.Subscribe(_ => { });
             Assert.IsTrue(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             d.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]
@@ -395,9 +403,11 @@ namespace ReactiveTests.Tests
         {
             var s = new BehaviorSubject<int>(42);
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]
@@ -437,6 +447,10 @@ namespace ReactiveTests.Tests
         {
             var s = new BehaviorSubject<int>(42);
             Assert.AreEqual(42, s.Value);
+
+            var x = default(int);
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(42, x);
         }
 
         [TestMethod]
@@ -445,8 +459,15 @@ namespace ReactiveTests.Tests
             var s = new BehaviorSubject<int>(42);
             Assert.AreEqual(42, s.Value);
 
+            var x = default(int);
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(42, x);
+
             s.OnNext(43);
             Assert.AreEqual(43, s.Value);
+
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(43, x);
         }
 
         [TestMethod]
@@ -455,11 +476,21 @@ namespace ReactiveTests.Tests
             var s = new BehaviorSubject<int>(42);
             Assert.AreEqual(42, s.Value);
 
+            var x = default(int);
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(42, x);
+
             s.OnNext(43);
             Assert.AreEqual(43, s.Value);
 
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(43, x);
+
             s.OnNext(44);
             Assert.AreEqual(44, s.Value);
+
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(44, x);
         }
 
         [TestMethod]
@@ -468,31 +499,56 @@ namespace ReactiveTests.Tests
             var s = new BehaviorSubject<int>(42);
             Assert.AreEqual(42, s.Value);
 
+            var x = default(int);
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(42, x);
+
             s.OnNext(43);
             Assert.AreEqual(43, s.Value);
 
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(43, x);
+
             s.OnNext(44);
             Assert.AreEqual(44, s.Value);
 
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(44, x);
+
             s.OnCompleted();
             Assert.AreEqual(44, s.Value);
 
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(44, x);
+
             s.OnNext(1234);
             Assert.AreEqual(44, s.Value);
+
+            Assert.IsTrue(s.TryGetValue(out x));
+            Assert.AreEqual(44, x);
         }
 
-        [TestMethod, ExpectedException(typeof(InvalidOperationException))]
+        [TestMethod]
         public void Value_ThrowsAfterOnError()
         {
             var s = new BehaviorSubject<int>(42);
             Assert.AreEqual(42, s.Value);
 
             s.OnError(new InvalidOperationException());
-            
-            Assert.Fail("Should not be able to read Value: {0}", s.Value);
+
+            ReactiveAssert.Throws<InvalidOperationException>(() =>
+            {
+                var ignored = s.Value;
+            });
+
+            ReactiveAssert.Throws<InvalidOperationException>(() =>
+            {
+                var x = default(int);
+                s.TryGetValue(out x);
+            });
         }
 
-        [TestMethod, ExpectedException(typeof(ObjectDisposedException))]
+        [TestMethod]
         public void Value_ThrowsOnDispose()
         {
             var s = new BehaviorSubject<int>(42);
@@ -500,7 +556,13 @@ namespace ReactiveTests.Tests
 
             s.Dispose();
 
-            Assert.Fail("Should not be able to read Value: {0}", s.Value);
+            ReactiveAssert.Throws<ObjectDisposedException>(() =>
+            {
+                var ignored = s.Value;
+            });
+
+            var x = default(int);
+            Assert.IsFalse(s.TryGetValue(out x));
         }
     }
 }

+ 10 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs

@@ -1539,15 +1539,19 @@ namespace ReactiveTests.Tests
         private static void HasObservers_Dispose1(ReplaySubject<int> s)
         {
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             var d = s.Subscribe(_ => { });
             Assert.IsTrue(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
 
             d.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]
@@ -1562,15 +1566,19 @@ namespace ReactiveTests.Tests
         private static void HasObservers_Dispose2(ReplaySubject<int> s)
         {
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             var d = s.Subscribe(_ => { });
             Assert.IsTrue(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             d.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]
@@ -1585,9 +1593,11 @@ namespace ReactiveTests.Tests
         private static void HasObservers_Dispose3(ReplaySubject<int> s)
         {
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]

+ 10 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/SubjectTest.cs

@@ -519,15 +519,19 @@ namespace ReactiveTests.Tests
         {
             var s = new Subject<int>();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             var d = s.Subscribe(_ => { });
             Assert.IsTrue(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
 
             d.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]
@@ -535,15 +539,19 @@ namespace ReactiveTests.Tests
         {
             var s = new Subject<int>();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             var d = s.Subscribe(_ => { });
             Assert.IsTrue(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             d.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]
@@ -551,9 +559,11 @@ namespace ReactiveTests.Tests
         {
             var s = new Subject<int>();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsFalse(s.IsDisposed);
 
             s.Dispose();
             Assert.IsFalse(s.HasObservers);
+            Assert.IsTrue(s.IsDisposed);
         }
 
         [TestMethod]