Bläddra i källkod

4.x: Improve Subject observer tracking (#511)

David Karnok 7 år sedan
förälder
incheckning
185ed5d2fd
1 ändrade filer med 135 tillägg och 81 borttagningar
  1. 135 81
      Rx.NET/Source/src/System.Reactive/Subjects/Subject.cs

+ 135 - 81
Rx.NET/Source/src/System.Reactive/Subjects/Subject.cs

@@ -16,7 +16,15 @@ namespace System.Reactive.Subjects
     {
         #region Fields
 
-        private volatile IObserver<T> _observer;
+        SubjectDisposable[] _observers;
+
+        Exception _exception;
+
+        static readonly SubjectDisposable[] EMPTY = new SubjectDisposable[0];
+
+        static readonly SubjectDisposable[] TERMINATED = new SubjectDisposable[0];
+
+        static readonly SubjectDisposable[] DISPOSED = new SubjectDisposable[0];
 
         #endregion
 
@@ -27,7 +35,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         public Subject()
         {
-            _observer = NopObserver<T>.Instance;
+            Volatile.Write(ref _observers, EMPTY);
         }
 
         #endregion
@@ -41,14 +49,14 @@ namespace System.Reactive.Subjects
         {
             get
             {
-                return _observer != NopObserver<T>.Instance && !(_observer is DoneObserver<T>) && _observer != DisposedObserver<T>.Instance;
+                return Volatile.Read(ref _observers).Length != 0;
             }
         }
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
         /// </summary>
-        public override bool IsDisposed => _observer is DisposedObserver<T>;
+        public override bool IsDisposed => Volatile.Read(ref _observers) == DISPOSED;
 
         #endregion
 
@@ -56,23 +64,38 @@ namespace System.Reactive.Subjects
 
         #region IObserver<T> implementation
 
+        void ThrowDisposed()
+        {
+            throw new ObjectDisposedException(string.Empty);
+        }
+
         /// <summary>
         /// Notifies all subscribed observers about the end of the sequence.
         /// </summary>
         public override void OnCompleted()
         {
-            var oldObserver = default(IObserver<T>);
-            var newObserver = DoneObserver<T>.Completed;
-
-            do
+            for (; ; )
             {
-                oldObserver = _observer;
-
-                if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
+                var observers = Volatile.Read(ref _observers);
+                if (observers == DISPOSED)
+                {
+                    _exception = null;
+                    ThrowDisposed();
                     break;
-            } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
-
-            oldObserver.OnCompleted();
+                }
+                if (observers == TERMINATED)
+                {
+                    break;
+                }
+                if (Interlocked.CompareExchange(ref _observers, TERMINATED, observers) == observers)
+                {
+                    foreach (var observer in observers)
+                    {
+                        observer.Observer?.OnCompleted();
+                    }
+                    break;
+                }
+            }
         }
 
         /// <summary>
@@ -85,18 +108,29 @@ namespace System.Reactive.Subjects
             if (error == null)
                 throw new ArgumentNullException(nameof(error));
 
-            var oldObserver = default(IObserver<T>);
-            var newObserver = new DoneObserver<T> { Exception = error };
-
-            do
+            for (; ; )
             {
-                oldObserver = _observer;
-
-                if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
+                var observers = Volatile.Read(ref _observers);
+                if (observers == DISPOSED)
+                {
+                    _exception = null;
+                    ThrowDisposed();
                     break;
-            } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
-
-            oldObserver.OnError(error);
+                }
+                if (observers == TERMINATED)
+                {
+                    break;
+                }
+                _exception = error;
+                if (Interlocked.CompareExchange(ref _observers, TERMINATED, observers) == observers)
+                {
+                    foreach (var observer in observers)
+                    {
+                        observer.Observer?.OnError(error);
+                    }
+                    break;
+                }
+            }
         }
 
         /// <summary>
@@ -105,7 +139,17 @@ namespace System.Reactive.Subjects
         /// <param name="value">The value to send to all currently subscribed observers.</param>
         public override void OnNext(T value)
         {
-            _observer.OnNext(value);
+            var observers = Volatile.Read(ref _observers);
+            if (observers == DISPOSED)
+            {
+                _exception = null;
+                ThrowDisposed();
+                return;
+            }
+            foreach (var observer in observers)
+            {
+                observer.Observer?.OnNext(value);
+            }
         }
 
         #endregion
@@ -123,59 +167,92 @@ namespace System.Reactive.Subjects
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var oldObserver = default(IObserver<T>);
-            var newObserver = default(IObserver<T>);
-
-            do
+            var disposable = default(SubjectDisposable);
+            for (; ; )
             {
-                oldObserver = _observer;
+                var observers = Volatile.Read(ref _observers);
+                if (observers == DISPOSED)
+                {
+                    _exception = null;
+                    ThrowDisposed();
+                    break;
+                }
+                if (observers == TERMINATED)
+                {
+                    var ex = _exception;
+                    if (ex != null)
+                    {
+                        observer.OnError(ex);
+                    }
+                    else
+                    {
+                        observer.OnCompleted();
+                    }
+                    break;
+                }
 
-                if (oldObserver == DisposedObserver<T>.Instance)
+                if (disposable == null)
                 {
-                    throw new ObjectDisposedException("");
+                    disposable = new SubjectDisposable(this, observer);
                 }
 
-                if (oldObserver == DoneObserver<T>.Completed)
+                var n = observers.Length;
+                var b = new SubjectDisposable[n + 1];
+                Array.Copy(observers, 0, b, 0, n);
+                b[n] = disposable;
+                if (Interlocked.CompareExchange(ref _observers, b, observers) == observers)
                 {
-                    observer.OnCompleted();
-                    return Disposable.Empty;
+                    return disposable;
                 }
+            }
+            return Disposable.Empty;
+        }
 
-                if (oldObserver is DoneObserver<T> done)
+        void Unsubscribe(SubjectDisposable observer)
+        {
+            for (; ; )
+            {
+                var a = Volatile.Read(ref _observers);
+                var n = a.Length;
+                if (n == 0)
+                {
+                    break;
+                }
+
+                var j = Array.IndexOf(a, observer);
+
+                if (j < 0)
                 {
-                    observer.OnError(done.Exception);
-                    return Disposable.Empty;
+                    break;
                 }
 
-                if (oldObserver == NopObserver<T>.Instance)
+                var b = default(SubjectDisposable[]);
+                if (n == 1)
                 {
-                    newObserver = observer;
+                    b = EMPTY;
                 }
                 else
                 {
-                    if (oldObserver is Observer<T> obs)
-                    {
-                        newObserver = obs.Add(observer);
-                    }
-                    else
-                    {
-                        newObserver = new Observer<T>(new ImmutableList<IObserver<T>>(new[] { oldObserver, observer }));
-                    }
+                    b = new SubjectDisposable[n - 1];
+                    Array.Copy(a, 0, b, 0, j);
+                    Array.Copy(a, j + 1, b, j, n - j - 1);
                 }
-            } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
-
-            return new Subscription(this, observer);
+                if (Interlocked.CompareExchange(ref _observers, b, a) == a)
+                {
+                    break;
+                }
+            }
         }
 
-        private sealed class Subscription : IDisposable
+        private sealed class SubjectDisposable : IDisposable
         {
             private Subject<T> _subject;
             private IObserver<T> _observer;
 
-            public Subscription(Subject<T> subject, IObserver<T> observer)
+            public SubjectDisposable(Subject<T> subject, IObserver<T> observer)
             {
                 _subject = subject;
-                _observer = observer;
+                Volatile.Write(ref _observer, observer);
             }
 
             public void Dispose()
@@ -184,35 +261,11 @@ namespace System.Reactive.Subjects
                 if (observer == null)
                     return;
 
-                _subject.Unsubscribe(observer);
+                _subject.Unsubscribe(this);
                 _subject = null;
             }
-        }
-
-        private void Unsubscribe(IObserver<T> observer)
-        {
-            var oldObserver = default(IObserver<T>);
-            var newObserver = default(IObserver<T>);
 
-            do
-            {
-                oldObserver = _observer;
-
-                if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
-                    return;
-
-                if (oldObserver is Observer<T> obs)
-                {
-                    newObserver = obs.Remove(observer);
-                }
-                else
-                {
-                    if (oldObserver != observer)
-                        return;
-
-                    newObserver = NopObserver<T>.Instance;
-                }
-            } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
+            public IObserver<T> Observer { get { return Volatile.Read(ref _observer); } }
         }
 
         #endregion
@@ -224,7 +277,8 @@ namespace System.Reactive.Subjects
         /// </summary>
         public override void Dispose()
         {
-            _observer = DisposedObserver<T>.Instance;
+            Interlocked.Exchange(ref _observers, DISPOSED);
+            _exception = null;
         }
 
         #endregion