Browse Source

4.x: Rework & fix SkipUntil with lock-free methods (#551)

David Karnok 7 years ago
parent
commit
c6766f50c9
1 changed files with 88 additions and 48 deletions
  1. 88 48
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

+ 88 - 48
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

@@ -4,6 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -24,6 +25,12 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TSource>
         {
+            IDisposable _mainDisposable;
+            IDisposable _otherDisposable;
+            volatile bool _forward;
+            int _halfSerializer;
+            Exception _error;
+
             public _(IObserver<TSource> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
@@ -31,95 +38,128 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public IDisposable Run(SkipUntil<TSource, TOther> parent)
             {
-                var sourceObserver = new SourceObserver(this);
-                var otherObserver = new OtherObserver(this, sourceObserver);
-
-                var otherSubscription = parent._other.SubscribeSafe(otherObserver);
-                var sourceSubscription = parent._source.SubscribeSafe(sourceObserver);
-                
-                sourceObserver.Disposable = sourceSubscription;
-                otherObserver.Disposable = otherSubscription;
-
-                return StableCompositeDisposable.Create(
-                    sourceSubscription,
-                    otherSubscription
-                );
+                Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this)));
+
+                Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this));
+
+                return this;
             }
 
-            private sealed class SourceObserver : IObserver<TSource>
+            protected override void Dispose(bool disposing)
             {
-                private readonly _ _parent;
-                public volatile bool _forward;
-                private readonly SingleAssignmentDisposable _subscription;
-
-                public SourceObserver(_ parent)
+                if (disposing)
                 {
-                    _parent = parent;
-                    _subscription = new SingleAssignmentDisposable();
+                    DisposeMain();
+                    if (!Disposable.GetIsDisposed(ref _otherDisposable))
+                    {
+                        Disposable.TryDispose(ref _otherDisposable);
+                    }
                 }
 
-                public IDisposable Disposable
+                base.Dispose(disposing);
+            }
+
+            void DisposeMain()
+            {
+                if (!Disposable.GetIsDisposed(ref _mainDisposable))
                 {
-                    set { _subscription.Disposable = value; }
+                    Disposable.TryDispose(ref _mainDisposable);
                 }
+            }
 
-                public void OnNext(TSource value)
+            public override void OnNext(TSource value)
+            {
+                if (_forward)
                 {
-                    if (_forward)
-                        _parent.ForwardOnNext(value);
+                    if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0)
+                    {
+                        ForwardOnNext(value);
+                        if (Interlocked.Decrement(ref _halfSerializer) != 0)
+                        {
+                            var ex = _error;
+                            _error = SkipUntilTerminalException.Instance;
+                            ForwardOnError(ex);
+                        }
+                    }
                 }
+            }
 
-                public void OnError(Exception error)
+            public override void OnError(Exception ex)
+            {
+                if (Interlocked.CompareExchange(ref _error, ex, null) == null)
                 {
-                    _parent.ForwardOnError(error);
+                    if (Interlocked.Increment(ref _halfSerializer) == 1)
+                    {
+                        _error = SkipUntilTerminalException.Instance;
+                        ForwardOnError(ex);
+                    }
                 }
+            }
 
-                public void OnCompleted()
+            public override void OnCompleted()
+            {
+                if (_forward)
                 {
-                    if (_forward)
-                        _parent.ForwardOnCompleted();
-
-                    _subscription.Dispose(); // We can't cancel the other stream yet, it may be on its way to dispatch an OnError message and we don't want to have a race.
+                    if (Interlocked.CompareExchange(ref _error, SkipUntilTerminalException.Instance, null) == null)
+                    {
+                        if (Interlocked.Increment(ref _halfSerializer) == 1)
+                        {
+                            ForwardOnCompleted();
+                        }
+                    }
+                }
+                else
+                {
+                    DisposeMain();
                 }
             }
 
-            private sealed class OtherObserver : IObserver<TOther>
+            void OtherComplete()
+            {
+                _forward = true;
+            }
+
+            sealed class OtherObserver : IObserver<TOther>, IDisposable
             {
-                private readonly _ _parent;
-                private readonly SourceObserver _sourceObserver;
-                private readonly SingleAssignmentDisposable _subscription;
+                readonly _ _parent;
 
-                public OtherObserver(_ parent, SourceObserver sourceObserver)
+                public OtherObserver(_ parent)
                 {
                     _parent = parent;
-                    _sourceObserver = sourceObserver;
-                    _subscription = new SingleAssignmentDisposable();
                 }
 
-                public IDisposable Disposable
+                public void Dispose()
                 {
-                    set { _subscription.Disposable = value; }
+                    if (!Disposable.GetIsDisposed(ref _parent._otherDisposable))
+                    {
+                        Disposable.TryDispose(ref _parent._otherDisposable);
+                    }
                 }
 
-                public void OnNext(TOther value)
+                public void OnCompleted()
                 {
-                    _sourceObserver._forward = true;
-                    _subscription.Dispose();
+                    Dispose();
                 }
 
                 public void OnError(Exception error)
                 {
-                    _parent.ForwardOnError(error);
+                    _parent.OnError(error);
                 }
 
-                public void OnCompleted()
+                public void OnNext(TOther value)
                 {
-                    _subscription.Dispose();
+                    _parent.OtherComplete();
+                    Dispose();
                 }
             }
         }
     }
 
+    internal static class SkipUntilTerminalException
+    {
+        internal static readonly Exception Instance = new Exception("No further exceptions");
+    }
+
     internal sealed class SkipUntil<TSource> : Producer<TSource, SkipUntil<TSource>._>
     {
         private readonly IObservable<TSource> _source;