فهرست منبع

4.x: Improve Amb (#512)

David Karnok 7 سال پیش
والد
کامیت
c961295c14
1فایلهای تغییر یافته به همراه95 افزوده شده و 108 حذف شده
  1. 95 108
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs

+ 95 - 108
Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs

@@ -3,10 +3,11 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Disposables;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Amb<TSource> : Producer<TSource, Amb<TSource>._>
+    internal sealed class Amb<TSource> : Producer<TSource, Amb<TSource>.AmbCoordinator>
     {
         private readonly IObservable<TSource> _left;
         private readonly IObservable<TSource> _right;
@@ -17,152 +18,138 @@ namespace System.Reactive.Linq.ObservableImpl
             _right = right;
         }
 
-        protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
+        protected override AmbCoordinator CreateSink(IObserver<TSource> observer, IDisposable cancel) => new AmbCoordinator(observer);
 
-        protected override IDisposable Run(_ sink) => sink.Run(this);
+        protected override IDisposable Run(AmbCoordinator sink) => sink.Run(_left, _right);
 
-        internal sealed class _ : Sink<TSource>
+        internal sealed class AmbCoordinator : IDisposable
         {
-            public _(IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            readonly AmbObserver leftObserver;
+
+            readonly AmbObserver rightObserver;
+
+            int winner;
+
+            public AmbCoordinator(IObserver<TSource> observer)
             {
+                leftObserver = new AmbObserver(observer, this, true);
+                rightObserver = new AmbObserver(observer, this, false);
             }
 
-            private AmbState _choice;
+            public IDisposable Run(IObservable<TSource> left, IObservable<TSource> right)
+            {
+                leftObserver.OnSubscribe(left.Subscribe(leftObserver));
+                rightObserver.OnSubscribe(right.Subscribe(rightObserver));
+                return this;
+            }
 
-            public IDisposable Run(Amb<TSource> parent)
+            public void Dispose()
             {
-                var ls = new SingleAssignmentDisposable();
-                var rs = new SingleAssignmentDisposable();
-                var d = StableCompositeDisposable.Create(ls, rs);
+                leftObserver.Dispose();
+                rightObserver.Dispose();
+            }
 
-                var gate = new object();
+            /// <summary>
+            /// Try winning the race for the right of emission.
+            /// </summary>
+            /// <param name="isLeft">If true, the contender is the left source.</param>
+            /// <returns>True if the contender has won the race.</returns>
+            public bool TryWin(bool isLeft)
+            {
+                var index = isLeft ? 1 : 2;
+
+                if (Volatile.Read(ref winner) == index)
+                {
+                    return true;
+                }
+                if (Interlocked.CompareExchange(ref winner, index, 0) == 0)
+                {
+                    (isLeft ? rightObserver : leftObserver).Dispose();
+                    return true;
+                }
+                return false;
+            }
 
-                var lo = new AmbObserver();
-                lo._disposable = d;
-                lo._target = new DecisionObserver(this, gate, AmbState.Left, ls, rs, lo);
+            sealed class AmbObserver : IObserver<TSource>, IDisposable
+            {
+                readonly IObserver<TSource> downstream;
 
-                var ro = new AmbObserver();
-                ro._disposable = d;
-                ro._target = new DecisionObserver(this, gate, AmbState.Right, rs, ls, ro);
+                readonly AmbCoordinator parent;
 
-                _choice = AmbState.Neither;
+                readonly bool isLeft;
 
-                ls.Disposable = parent._left.SubscribeSafe(lo);
-                rs.Disposable = parent._right.SubscribeSafe(ro);
+                IDisposable upstream;
 
-                return d;
-            }
+                /// <summary>
+                /// If true, this observer won the race and now can emit
+                /// on a fast path.
+                /// </summary>
+                bool iwon;
 
-            private sealed class DecisionObserver : IObserver<TSource>
-            {
-                private readonly _ _parent;
-                private readonly AmbState _me;
-                private readonly IDisposable _subscription;
-                private readonly IDisposable _otherSubscription;
-                private readonly object _gate;
-                private readonly AmbObserver _observer;
-
-                public DecisionObserver(_ parent, object gate, AmbState me, IDisposable subscription, IDisposable otherSubscription, AmbObserver observer)
+                public AmbObserver(IObserver<TSource> downstream, AmbCoordinator parent, bool isLeft)
                 {
-                    _parent = parent;
-                    _gate = gate;
-                    _me = me;
-                    _subscription = subscription;
-                    _otherSubscription = otherSubscription;
-                    _observer = observer;
+                    this.downstream = downstream;
+                    this.parent = parent;
+                    this.isLeft = isLeft;
                 }
 
-                public void OnNext(TSource value)
+                internal void OnSubscribe(IDisposable d)
                 {
-                    lock (_gate)
+                    if (Interlocked.CompareExchange(ref upstream, d, null) != null)
                     {
-                        if (_parent._choice == AmbState.Neither)
-                        {
-                            _parent._choice = _me;
-                            _otherSubscription.Dispose();
-                            _observer._disposable = _subscription;
-                            _observer._target = _parent._observer;
-                        }
-
-                        if (_parent._choice == _me)
-                        {
-                            _parent._observer.OnNext(value);
-                        }
+                        d?.Dispose();
                     }
                 }
 
-                public void OnError(Exception error)
+                public void Dispose()
                 {
-                    lock (_gate)
-                    {
-                        if (_parent._choice == AmbState.Neither)
-                        {
-                            _parent._choice = _me;
-                            _otherSubscription.Dispose();
-                            _observer._disposable = _subscription;
-                            _observer._target = _parent._observer;
-                        }
-
-                        if (_parent._choice == _me)
-                        {
-                            _parent._observer.OnError(error);
-                            _parent.Dispose();
-                        }
-                    }
+                    Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
                 }
 
                 public void OnCompleted()
                 {
-                    lock (_gate)
+                    if (iwon)
                     {
-                        if (_parent._choice == AmbState.Neither)
-                        {
-                            _parent._choice = _me;
-                            _otherSubscription.Dispose();
-                            _observer._disposable = _subscription;
-                            _observer._target = _parent._observer;
-                        }
-
-                        if (_parent._choice == _me)
-                        {
-                            _parent._observer.OnCompleted();
-                            _parent.Dispose();
-                        }
+                        downstream.OnCompleted();
                     }
-                }
-            }
-
-            private sealed class AmbObserver : IObserver<TSource>
-            {
-                public IObserver<TSource> _target;
-
-                public IDisposable _disposable;
-
-                public void OnNext(TSource value)
-                {
-                    _target.OnNext(value);
+                    else
+                    if (parent.TryWin(isLeft))
+                    {
+                        iwon = true;
+                        downstream.OnCompleted();
+                    }
+                    Dispose();
                 }
 
                 public void OnError(Exception error)
                 {
-                    _target.OnError(error);
-                    _disposable.Dispose();
+                    if (iwon)
+                    {
+                        downstream.OnError(error);
+                    }
+                    else
+                    if (parent.TryWin(isLeft))
+                    {
+                        iwon = true;
+                        downstream.OnError(error);
+                    }
+                    Dispose();
                 }
 
-                public void OnCompleted()
+                public void OnNext(TSource value)
                 {
-                    _target.OnCompleted();
-                    _disposable.Dispose();
+                    if (iwon)
+                    {
+                        downstream.OnNext(value);
+                    }
+                    else
+                    if (parent.TryWin(isLeft))
+                    {
+                        iwon = true;
+                        downstream.OnNext(value);
+                    }
                 }
             }
-
-            private enum AmbState
-            {
-                Left,
-                Right,
-                Neither,
-            }
         }
     }
 }