Răsfoiți Sursa

4.x: Improve AsyncSubject by using lock-free methods (#510)

David Karnok 7 ani în urmă
părinte
comite
f600a696f7
1 a modificat fișierele cu 195 adăugiri și 114 ștergeri
  1. 195 114
      Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs

+ 195 - 114
Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs

@@ -17,15 +17,27 @@ namespace System.Reactive.Subjects
     {
         #region Fields
 
-        private readonly object _gate = new object();
+        private AsyncSubjectDisposable[] _observers;
 
-        private ImmutableList<IObserver<T>> _observers;
-        private bool _isDisposed;
-        private bool _isStopped;
         private T _value;
         private bool _hasValue;
         private Exception _exception;
 
+        /// <summary>
+        /// A pre-allocated empty array for the no-observers state.
+        /// </summary>
+        static readonly AsyncSubjectDisposable[] EMPTY = new AsyncSubjectDisposable[0];
+
+        /// <summary>
+        /// A pre-allocated empty array indicating the AsyncSubject has terminated
+        /// </summary>
+        static readonly AsyncSubjectDisposable[] TERMINATED = new AsyncSubjectDisposable[0];
+
+        /// <summary>
+        /// A pre-allocated empty array indicating the AsyncSubject has terminated
+        /// </summary>
+        static readonly AsyncSubjectDisposable[] DISPOSED = new AsyncSubjectDisposable[0];
+
         #endregion
 
         #region Constructors
@@ -35,7 +47,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         public AsyncSubject()
         {
-            _observers = ImmutableList<IObserver<T>>.Empty;
+            _observers = EMPTY;
         }
 
         #endregion
@@ -45,21 +57,12 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public override bool HasObservers => _observers?.Data.Length > 0;
+        public override bool HasObservers => _observers.Length != 0;
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
         /// </summary>
-        public override bool IsDisposed
-        {
-            get
-            {
-                lock (_gate)
-                {
-                    return _isDisposed;
-                }
-            }
-        }
+        public override bool IsDisposed => Volatile.Read(ref _observers) == DISPOSED;
 
         #endregion
 
@@ -72,39 +75,44 @@ namespace System.Reactive.Subjects
         /// </summary>
         public override void OnCompleted()
         {
-            var os = default(IObserver<T>[]);
-
-            var v = default(T);
-            var hv = false;
-            lock (_gate)
+            for (; ; )
             {
-                CheckDisposed();
-
-                if (!_isStopped)
+                var observers = Volatile.Read(ref this._observers);
+                if (observers == DISPOSED)
                 {
-                    os = _observers.Data;
-                    _observers = ImmutableList<IObserver<T>>.Empty;
-                    _isStopped = true;
-                    v = _value;
-                    hv = _hasValue;
+                    _exception = null;
+                    ThrowDisposed();
+                    break;
                 }
-            }
-
-            if (os != null)
-            {
-                if (hv)
+                if (observers == TERMINATED)
                 {
-                    foreach (var o in os)
-                    {
-                        o.OnNext(v);
-                        o.OnCompleted();
-                    }
+                    break;
                 }
-                else
+                if (Interlocked.CompareExchange(ref _observers, TERMINATED, observers) == observers)
                 {
-                    foreach (var o in os)
+                    var hasValue = this._hasValue;
+                    if (hasValue)
+                    {
+                        var value = this._value;
+
+                        foreach (var o in observers)
+                        {
+                            if (!o.IsDisposed())
+                            {
+                                o.downstream.OnNext(value);
+                                o.downstream.OnCompleted();
+                            }
+                        }
+                    }
+                    else
                     {
-                        o.OnCompleted();
+                        foreach (var o in observers)
+                        {
+                            if (!o.IsDisposed())
+                            {
+                                o.downstream.OnCompleted();
+                            }
+                        }
                     }
                 }
             }
@@ -120,27 +128,34 @@ namespace System.Reactive.Subjects
             if (error == null)
                 throw new ArgumentNullException(nameof(error));
 
-            var os = default(IObserver<T>[]);
-            lock (_gate)
+            for (; ; )
             {
-                CheckDisposed();
-
-                if (!_isStopped)
+                var observers = Volatile.Read(ref this._observers);
+                if (observers == DISPOSED)
                 {
-                    os = _observers.Data;
-                    _observers = ImmutableList<IObserver<T>>.Empty;
-                    _isStopped = true;
-                    _exception = error;
+                    _exception = null;
+                    _value = default(T);
+                    ThrowDisposed();
+                    break;
                 }
-            }
-
-            if (os != null)
-            {
-                foreach (var o in os)
+                if (observers == TERMINATED)
                 {
-                    o.OnError(error);
+                    break;
+                }
+                this._exception = error;
+                if (Interlocked.CompareExchange(ref _observers, TERMINATED, observers) == observers)
+                {
+                    var ex = this._exception;
+                    foreach (var o in observers)
+                    {
+                        if (!o.IsDisposed())
+                        {
+                            o.downstream.OnError(error);
+                        }
+                    }
                 }
             }
+
         }
 
         /// <summary>
@@ -149,16 +164,21 @@ namespace System.Reactive.Subjects
         /// <param name="value">The value to store in the subject.</param>
         public override void OnNext(T value)
         {
-            lock (_gate)
+            var observers = Volatile.Read(ref this._observers);
+            if (observers == DISPOSED)
             {
-                CheckDisposed();
-
-                if (!_isStopped)
-                {
-                    _value = value;
-                    _hasValue = true;
-                }
+                _value = default(T);
+                _exception = null;
+                ThrowDisposed();
+                return;
+            }
+            if (observers == TERMINATED)
+            {
+                return;
             }
+
+            _value = value;
+            _hasValue = true;
         }
 
         #endregion
@@ -176,66 +196,129 @@ namespace System.Reactive.Subjects
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var ex = default(Exception);
-            var v = default(T);
-            var hv = false;
+            var parent = new AsyncSubjectDisposable(this, observer);
 
-            lock (_gate)
+            if (!Add(parent))
             {
-                CheckDisposed();
-
-                if (!_isStopped)
+                var ex = _exception;
+                if (ex != null)
                 {
-                    _observers = _observers.Add(observer);
-                    return new Subscription(this, observer);
+                    observer.OnError(ex);
                 }
-
-                ex = _exception;
-                hv = _hasValue;
-                v = _value;
+                else
+                {
+                    if (_hasValue)
+                    {
+                        observer.OnNext(_value);
+                    }
+                    observer.OnCompleted();
+                }
+                return Disposable.Empty;
             }
 
-            if (ex != null)
-            {
-                observer.OnError(ex);
-            }
-            else if (hv)
+            return parent;
+        }
+
+        bool Add(AsyncSubjectDisposable inner)
+        {
+            for (; ; )
             {
-                observer.OnNext(v);
-                observer.OnCompleted();
+                var a = Volatile.Read(ref _observers);
+                if (a == DISPOSED)
+                {
+                    _value = default(T);
+                    _exception = null;
+                    ThrowDisposed();
+                    return true;
+                }
+
+                if (a == TERMINATED)
+                {
+                    return false;
+                }
+
+                var n = a.Length;
+                var b = new AsyncSubjectDisposable[n + 1];
+                Array.Copy(a, 0, b, 0, n);
+                b[n] = inner;
+                if (Interlocked.CompareExchange(ref _observers, b, a) == a)
+                {
+                    return true;
+                }
             }
-            else
+        }
+
+        void Remove(AsyncSubjectDisposable inner)
+        {
+            for (; ; )
             {
-                observer.OnCompleted();
-            }
+                var a = Volatile.Read(ref _observers);
+
+                var n = a.Length;
 
-            return Disposable.Empty;
+                if (n == 0)
+                {
+                    break;
+                }
+
+                var j = -1;
+
+                for (int i = 0; i < n; i++)
+                {
+                    if (a[i] == inner)
+                    {
+                        j = i;
+                        break;
+                    }
+                }
+
+                if (j < 0)
+                {
+                    break;
+                }
+
+                var b = default(AsyncSubjectDisposable[]);
+                if (n == 1)
+                {
+                    b = EMPTY;
+                }
+                else
+                {
+                    b = new AsyncSubjectDisposable[n - 1];
+                    Array.Copy(a, 0, b, 0, j);
+                    Array.Copy(a, j + 1, b, j, n - j - 1);
+                }
+
+                if (Interlocked.CompareExchange(ref _observers, b, a) == a)
+                {
+                    break;
+                }
+            }
         }
 
-        private sealed class Subscription : IDisposable
+        /// <summary>
+        /// A disposable connecting the AsyncSubject and an IObserver.
+        /// </summary>
+        sealed class AsyncSubjectDisposable : IDisposable
         {
-            private readonly AsyncSubject<T> _subject;
-            private IObserver<T> _observer;
+            internal readonly IObserver<T> downstream;
+
+            AsyncSubject<T> parent;
 
-            public Subscription(AsyncSubject<T> subject, IObserver<T> observer)
+            public AsyncSubjectDisposable(AsyncSubject<T> parent, IObserver<T> downstream)
             {
-                _subject = subject;
-                _observer = observer;
+                this.parent = parent;
+                this.downstream = downstream;
             }
 
             public void Dispose()
             {
-                if (_observer != null)
-                {
-                    lock (_subject._gate)
-                    {
-                        if (!_subject._isDisposed && _observer != null)
-                        {
-                            _subject._observers = _subject._observers.Remove(_observer);
-                            _observer = null;
-                        }
-                    }
-                }
+                Interlocked.Exchange(ref parent, null)?.Remove(this);
+            }
+
+            internal bool IsDisposed()
+            {
+                return Volatile.Read(ref parent) == null;
             }
         }
 
@@ -243,10 +326,9 @@ namespace System.Reactive.Subjects
 
         #region IDisposable implementation
 
-        void CheckDisposed()
+        void ThrowDisposed()
         {
-            if (_isDisposed)
-                throw new ObjectDisposedException(string.Empty);
+            throw new ObjectDisposedException(string.Empty);
         }
 
         /// <summary>
@@ -254,12 +336,11 @@ namespace System.Reactive.Subjects
         /// </summary>
         public override void Dispose()
         {
-            lock (_gate)
+            if (Interlocked.Exchange(ref _observers, DISPOSED) != DISPOSED)
             {
-                _isDisposed = true;
-                _observers = null;
                 _exception = null;
                 _value = default(T);
+                _hasValue = false;
             }
         }
 
@@ -336,7 +417,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Gets whether the AsyncSubject has completed.
         /// </summary>
-        public bool IsCompleted => _isStopped;
+        public bool IsCompleted => Volatile.Read(ref _observers) == TERMINATED;
 
         /// <summary>
         /// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally.
@@ -346,7 +427,7 @@ namespace System.Reactive.Subjects
         [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Await pattern for C# and VB compilers.")]
         public T GetResult()
         {
-            if (!_isStopped)
+            if (Volatile.Read(ref _observers) != TERMINATED)
             {
                 var e = new ManualResetEvent(initialState: false);
                 OnCompleted(() => e.Set(), originalContext: false);