Kaynağa Gözat

4.x: Rework TakeUntil with lock-free methods (#550)

David Karnok 7 yıl önce
ebeveyn
işleme
caf620353c

+ 60 - 74
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

@@ -4,6 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -24,6 +25,11 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TSource>
         {
+            private IDisposable _mainDisposable;
+            private IDisposable _otherDisposable;
+            private int _halfSerializer;
+            private Exception _error;
+
             public _(IObserver<TSource> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
@@ -31,124 +37,104 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public IDisposable Run(TakeUntil<TSource, TOther> parent)
             {
-                var sourceObserver = new SourceObserver(this);
-                var otherObserver = new OtherObserver(this, sourceObserver);
-
-                // COMPAT - Order of Subscribe calls per v1.0.10621
-                var otherSubscription = parent._other.SubscribeSafe(otherObserver);
-                otherObserver.Disposable = otherSubscription;
-
-                var sourceSubscription = parent._source.SubscribeSafe(sourceObserver);
+                Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this)));
+                Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this));
 
-                return StableCompositeDisposable.Create(
-                    otherSubscription,
-                    sourceSubscription
-                );
+                return this;
             }
 
-            /*
-             * We tried a more fine-grained synchronization scheme to make TakeUntil more efficient, but
-             * this requires several CAS instructions, which quickly add up to being non-beneficial.
-             * 
-             * Notice an approach where the "other" channel performs an Interlocked.Exchange operation on
-             * the _parent._observer field to substitute it with a NopObserver<TSource> doesn't work,
-             * because the "other" channel still needs to send an OnCompleted message, which could happen
-             * concurrently with another message when the "source" channel has already read from the
-             * _parent._observer field between making the On* call.
-             * 
-             * Fixing this issue requires an ownership transfer mechanism for channels to get exclusive
-             * access to the outgoing observer while dispatching a message. Doing this more fine-grained
-             * than using locks turns out to be tricky and doesn't reduce cost.
-             */
-            private sealed class SourceObserver : IObserver<TSource>
+            protected override void Dispose(bool disposing)
             {
-                private readonly _ _parent;
-                public volatile bool _open;
-
-                public SourceObserver(_ parent)
+                if (disposing)
                 {
-                    _parent = parent;
-                    _open = false;
+                    if (!Disposable.GetIsDisposed(ref _mainDisposable))
+                    {
+                        Disposable.TryDispose(ref _mainDisposable);
+                        Disposable.TryDispose(ref _otherDisposable);
+                    }
                 }
 
-                public void OnNext(TSource value)
+                base.Dispose(disposing);
+            }
+
+            public override void OnNext(TSource value)
+            {
+                if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0)
                 {
-                    if (_open)
-                    {
-                        _parent.ForwardOnNext(value);
-                    }
-                    else
+                    ForwardOnNext(value);
+                    if (Interlocked.Decrement(ref _halfSerializer) != 0)
                     {
-                        lock (_parent)
+                        var ex = _error;
+                        if (ex != TakeUntilTerminalException.Instance)
+                        {
+                            _error = TakeUntilTerminalException.Instance;
+                            ForwardOnError(ex);
+                        }
+                        else
                         {
-                            _parent.ForwardOnNext(value);
+                            ForwardOnCompleted();
                         }
                     }
                 }
+            }
 
-                public void OnError(Exception error)
+            public override void OnError(Exception ex)
+            {
+                if (Interlocked.CompareExchange(ref _error, ex, null) == null)
                 {
-                    lock (_parent)
+                    if (Interlocked.Increment(ref _halfSerializer) == 1)
                     {
-                        _parent.ForwardOnError(error);
+                        _error = TakeUntilTerminalException.Instance;
+                        ForwardOnError(ex);
                     }
                 }
+            }
 
-                public void OnCompleted()
+            public override void OnCompleted()
+            {
+                if (Interlocked.CompareExchange(ref _error, TakeUntilTerminalException.Instance, null) == null)
                 {
-                    lock (_parent)
+                    if (Interlocked.Increment(ref _halfSerializer) == 1)
                     {
-                        _parent.ForwardOnCompleted();
+                        ForwardOnCompleted();
                     }
                 }
             }
 
-            private sealed class OtherObserver : IObserver<TOther>
+            sealed class OtherObserver : IObserver<TOther>
             {
-                private readonly _ _parent;
-                private readonly SourceObserver _sourceObserver;
-                private readonly SingleAssignmentDisposable _subscription;
+                readonly _ _parent;
 
-                public OtherObserver(_ parent, SourceObserver sourceObserver)
+                public OtherObserver(_ parent)
                 {
                     _parent = parent;
-                    _sourceObserver = sourceObserver;
-                    _subscription = new SingleAssignmentDisposable();
-                }
-
-                public IDisposable Disposable
-                {
-                    set { _subscription.Disposable = value; }
                 }
 
-                public void OnNext(TOther value)
+                public void OnCompleted()
                 {
-                    lock (_parent)
-                    {
-                        _parent.ForwardOnCompleted();
-                    }
+                    // Completion doesn't mean termination in Rx.NET for this operator
+                    Disposable.TryDispose(ref _parent._otherDisposable);
                 }
 
                 public void OnError(Exception error)
                 {
-                    lock (_parent)
-                    {
-                        _parent.ForwardOnError(error);
-                    }
+                    _parent.OnError(error);
                 }
 
-                public void OnCompleted()
+                public void OnNext(TOther value)
                 {
-                    lock (_parent)
-                    {
-                        _sourceObserver._open = true;
-                        _subscription.Dispose();
-                    }
+                    _parent.OnCompleted();
                 }
             }
+
         }
     }
 
+    internal static class TakeUntilTerminalException
+    {
+        internal static readonly Exception Instance = new Exception("No further exceptions");
+    }
+
     internal sealed class TakeUntil<TSource> : Producer<TSource, TakeUntil<TSource>._>
     {
         private readonly IObservable<TSource> _source;