Amb.cs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. internal sealed class Amb<TSource> : Producer<TSource>
  8. {
  9. private readonly IObservable<TSource> _left;
  10. private readonly IObservable<TSource> _right;
  11. public Amb(IObservable<TSource> left, IObservable<TSource> right)
  12. {
  13. _left = left;
  14. _right = right;
  15. }
  16. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  17. {
  18. var sink = new _(this, observer, cancel);
  19. setSink(sink);
  20. return sink.Run();
  21. }
  22. class _ : Sink<TSource>
  23. {
  24. private readonly Amb<TSource> _parent;
  25. public _(Amb<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  26. : base(observer, cancel)
  27. {
  28. _parent = parent;
  29. }
  30. private AmbState _choice;
  31. public IDisposable Run()
  32. {
  33. var ls = new SingleAssignmentDisposable();
  34. var rs = new SingleAssignmentDisposable();
  35. var d = StableCompositeDisposable.Create(ls, rs);
  36. var gate = new object();
  37. var lo = new AmbObserver();
  38. lo._disposable = d;
  39. lo._target = new DecisionObserver(this, gate, AmbState.Left, ls, rs, lo);
  40. var ro = new AmbObserver();
  41. ro._disposable = d;
  42. ro._target = new DecisionObserver(this, gate, AmbState.Right, rs, ls, ro);
  43. _choice = AmbState.Neither;
  44. ls.Disposable = _parent._left.SubscribeSafe(lo);
  45. rs.Disposable = _parent._right.SubscribeSafe(ro);
  46. return d;
  47. }
  48. class DecisionObserver : IObserver<TSource>
  49. {
  50. private readonly _ _parent;
  51. private readonly AmbState _me;
  52. private readonly IDisposable _subscription;
  53. private readonly IDisposable _otherSubscription;
  54. private readonly object _gate;
  55. private readonly AmbObserver _observer;
  56. public DecisionObserver(_ parent, object gate, AmbState me, IDisposable subscription, IDisposable otherSubscription, AmbObserver observer)
  57. {
  58. _parent = parent;
  59. _gate = gate;
  60. _me = me;
  61. _subscription = subscription;
  62. _otherSubscription = otherSubscription;
  63. _observer = observer;
  64. }
  65. public void OnNext(TSource value)
  66. {
  67. lock (_gate)
  68. {
  69. if (_parent._choice == AmbState.Neither)
  70. {
  71. _parent._choice = _me;
  72. _otherSubscription.Dispose();
  73. _observer._disposable = _subscription;
  74. _observer._target = _parent._observer;
  75. }
  76. if (_parent._choice == _me)
  77. _parent._observer.OnNext(value);
  78. }
  79. }
  80. public void OnError(Exception error)
  81. {
  82. lock (_gate)
  83. {
  84. if (_parent._choice == AmbState.Neither)
  85. {
  86. _parent._choice = _me;
  87. _otherSubscription.Dispose();
  88. _observer._disposable = _subscription;
  89. _observer._target = _parent._observer;
  90. }
  91. if (_parent._choice == _me)
  92. {
  93. _parent._observer.OnError(error);
  94. _parent.Dispose();
  95. }
  96. }
  97. }
  98. public void OnCompleted()
  99. {
  100. lock (_gate)
  101. {
  102. if (_parent._choice == AmbState.Neither)
  103. {
  104. _parent._choice = _me;
  105. _otherSubscription.Dispose();
  106. _observer._disposable = _subscription;
  107. _observer._target = _parent._observer;
  108. }
  109. if (_parent._choice == _me)
  110. {
  111. _parent._observer.OnCompleted();
  112. _parent.Dispose();
  113. }
  114. }
  115. }
  116. }
  117. class AmbObserver : IObserver<TSource>
  118. {
  119. public IObserver<TSource> _target;
  120. public IDisposable _disposable;
  121. public void OnNext(TSource value)
  122. {
  123. _target.OnNext(value);
  124. }
  125. public void OnError(Exception error)
  126. {
  127. _target.OnError(error);
  128. _disposable.Dispose();
  129. }
  130. public void OnCompleted()
  131. {
  132. _target.OnCompleted();
  133. _disposable.Dispose();
  134. }
  135. }
  136. enum AmbState
  137. {
  138. Left,
  139. Right,
  140. Neither
  141. }
  142. }
  143. }
  144. }