123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- using System.Reactive.Disposables;
- namespace System.Reactive.Linq.ObservableImpl
- {
- internal sealed class Amb<TSource> : Producer<TSource>
- {
- private readonly IObservable<TSource> _left;
- private readonly IObservable<TSource> _right;
- public Amb(IObservable<TSource> left, IObservable<TSource> right)
- {
- _left = left;
- _right = right;
- }
- protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
- {
- var sink = new _(this, observer, cancel);
- setSink(sink);
- return sink.Run();
- }
- class _ : Sink<TSource>
- {
- private readonly Amb<TSource> _parent;
- public _(Amb<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
- : base(observer, cancel)
- {
- _parent = parent;
- }
- private AmbState _choice;
- public IDisposable Run()
- {
- var ls = new SingleAssignmentDisposable();
- var rs = new SingleAssignmentDisposable();
- var d = StableCompositeDisposable.Create(ls, rs);
- var gate = new object();
- var lo = new AmbObserver();
- lo._disposable = d;
- lo._target = new DecisionObserver(this, gate, AmbState.Left, ls, rs, lo);
- var ro = new AmbObserver();
- ro._disposable = d;
- ro._target = new DecisionObserver(this, gate, AmbState.Right, rs, ls, ro);
- _choice = AmbState.Neither;
- ls.Disposable = _parent._left.SubscribeSafe(lo);
- rs.Disposable = _parent._right.SubscribeSafe(ro);
- return d;
- }
- class DecisionObserver : IObserver<TSource>
- {
- private readonly _ _parent;
- private readonly AmbState _me;
- private readonly IDisposable _subscription;
- private readonly IDisposable _otherSubscription;
- private readonly object _gate;
- private readonly AmbObserver _observer;
- public DecisionObserver(_ parent, object gate, AmbState me, IDisposable subscription, IDisposable otherSubscription, AmbObserver observer)
- {
- _parent = parent;
- _gate = gate;
- _me = me;
- _subscription = subscription;
- _otherSubscription = otherSubscription;
- _observer = observer;
- }
- public void OnNext(TSource value)
- {
- lock (_gate)
- {
- if (_parent._choice == AmbState.Neither)
- {
- _parent._choice = _me;
- _otherSubscription.Dispose();
- _observer._disposable = _subscription;
- _observer._target = _parent._observer;
- }
- if (_parent._choice == _me)
- _parent._observer.OnNext(value);
- }
- }
- public void OnError(Exception error)
- {
- lock (_gate)
- {
- if (_parent._choice == AmbState.Neither)
- {
- _parent._choice = _me;
- _otherSubscription.Dispose();
- _observer._disposable = _subscription;
- _observer._target = _parent._observer;
- }
- if (_parent._choice == _me)
- {
- _parent._observer.OnError(error);
- _parent.Dispose();
- }
- }
- }
- public void OnCompleted()
- {
- lock (_gate)
- {
- if (_parent._choice == AmbState.Neither)
- {
- _parent._choice = _me;
- _otherSubscription.Dispose();
- _observer._disposable = _subscription;
- _observer._target = _parent._observer;
- }
- if (_parent._choice == _me)
- {
- _parent._observer.OnCompleted();
- _parent.Dispose();
- }
- }
- }
- }
- class AmbObserver : IObserver<TSource>
- {
- public IObserver<TSource> _target;
- public IDisposable _disposable;
- public void OnNext(TSource value)
- {
- _target.OnNext(value);
- }
- public void OnError(Exception error)
- {
- _target.OnError(error);
- _disposable.Dispose();
- }
- public void OnCompleted()
- {
- _target.OnCompleted();
- _disposable.Dispose();
- }
- }
- enum AmbState
- {
- Left,
- Right,
- Neither
- }
- }
- }
- }
|