Amb.cs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Reactive.Disposables;
  7. namespace System.Reactive.Linq.ObservableImpl
  8. {
  9. class Amb<TSource> : Producer<TSource>
  10. {
  11. private readonly IObservable<TSource> _left;
  12. private readonly IObservable<TSource> _right;
  13. public Amb(IObservable<TSource> left, IObservable<TSource> right)
  14. {
  15. _left = left;
  16. _right = right;
  17. }
  18. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  19. {
  20. var sink = new _(this, observer, cancel);
  21. setSink(sink);
  22. return sink.Run();
  23. }
  24. class _ : Sink<TSource>
  25. {
  26. private readonly Amb<TSource> _parent;
  27. public _(Amb<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  28. : base(observer, cancel)
  29. {
  30. _parent = parent;
  31. }
  32. private AmbState _choice;
  33. public IDisposable Run()
  34. {
  35. var ls = new SingleAssignmentDisposable();
  36. var rs = new SingleAssignmentDisposable();
  37. var d = StableCompositeDisposable.Create(ls, rs);
  38. var gate = new object();
  39. var lo = new AmbObserver();
  40. lo._disposable = d;
  41. lo._target = new DecisionObserver(this, gate, AmbState.Left, ls, rs, lo);
  42. var ro = new AmbObserver();
  43. ro._disposable = d;
  44. ro._target = new DecisionObserver(this, gate, AmbState.Right, rs, ls, ro);
  45. _choice = AmbState.Neither;
  46. ls.Disposable = _parent._left.SubscribeSafe(lo);
  47. rs.Disposable = _parent._right.SubscribeSafe(ro);
  48. return d;
  49. }
  50. class DecisionObserver : IObserver<TSource>
  51. {
  52. private readonly _ _parent;
  53. private readonly AmbState _me;
  54. private readonly IDisposable _subscription;
  55. private readonly IDisposable _otherSubscription;
  56. private readonly object _gate;
  57. private readonly AmbObserver _observer;
  58. public DecisionObserver(_ parent, object gate, AmbState me, IDisposable subscription, IDisposable otherSubscription, AmbObserver observer)
  59. {
  60. _parent = parent;
  61. _gate = gate;
  62. _me = me;
  63. _subscription = subscription;
  64. _otherSubscription = otherSubscription;
  65. _observer = observer;
  66. }
  67. public void OnNext(TSource value)
  68. {
  69. lock (_gate)
  70. {
  71. if (_parent._choice == AmbState.Neither)
  72. {
  73. _parent._choice = _me;
  74. _otherSubscription.Dispose();
  75. _observer._disposable = _subscription;
  76. _observer._target = _parent._observer;
  77. }
  78. if (_parent._choice == _me)
  79. _parent._observer.OnNext(value);
  80. }
  81. }
  82. public void OnError(Exception error)
  83. {
  84. lock (_gate)
  85. {
  86. if (_parent._choice == AmbState.Neither)
  87. {
  88. _parent._choice = _me;
  89. _otherSubscription.Dispose();
  90. _observer._disposable = _subscription;
  91. _observer._target = _parent._observer;
  92. }
  93. if (_parent._choice == _me)
  94. {
  95. _parent._observer.OnError(error);
  96. _parent.Dispose();
  97. }
  98. }
  99. }
  100. public void OnCompleted()
  101. {
  102. lock (_gate)
  103. {
  104. if (_parent._choice == AmbState.Neither)
  105. {
  106. _parent._choice = _me;
  107. _otherSubscription.Dispose();
  108. _observer._disposable = _subscription;
  109. _observer._target = _parent._observer;
  110. }
  111. if (_parent._choice == _me)
  112. {
  113. _parent._observer.OnCompleted();
  114. _parent.Dispose();
  115. }
  116. }
  117. }
  118. }
  119. class AmbObserver : IObserver<TSource>
  120. {
  121. public IObserver<TSource> _target;
  122. public IDisposable _disposable;
  123. public void OnNext(TSource value)
  124. {
  125. _target.OnNext(value);
  126. }
  127. public void OnError(Exception error)
  128. {
  129. _target.OnError(error);
  130. _disposable.Dispose();
  131. }
  132. public void OnCompleted()
  133. {
  134. _target.OnCompleted();
  135. _disposable.Dispose();
  136. }
  137. }
  138. enum AmbState
  139. {
  140. Left,
  141. Right,
  142. Neither
  143. }
  144. }
  145. }
  146. }
  147. #endif