Amb.cs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal sealed class Amb<TSource> : Producer<TSource, Amb<TSource>.AmbCoordinator>
  9. {
  10. private readonly IObservable<TSource> _left;
  11. private readonly IObservable<TSource> _right;
  12. public Amb(IObservable<TSource> left, IObservable<TSource> right)
  13. {
  14. _left = left;
  15. _right = right;
  16. }
  17. protected override AmbCoordinator CreateSink(IObserver<TSource> observer, IDisposable cancel) => new AmbCoordinator(observer);
  18. protected override IDisposable Run(AmbCoordinator sink) => sink.Run(_left, _right);
  19. internal sealed class AmbCoordinator : IDisposable
  20. {
  21. readonly AmbObserver leftObserver;
  22. readonly AmbObserver rightObserver;
  23. int winner;
  24. public AmbCoordinator(IObserver<TSource> observer)
  25. {
  26. leftObserver = new AmbObserver(observer, this, true);
  27. rightObserver = new AmbObserver(observer, this, false);
  28. }
  29. public IDisposable Run(IObservable<TSource> left, IObservable<TSource> right)
  30. {
  31. leftObserver.OnSubscribe(left.Subscribe(leftObserver));
  32. rightObserver.OnSubscribe(right.Subscribe(rightObserver));
  33. return this;
  34. }
  35. public void Dispose()
  36. {
  37. leftObserver.Dispose();
  38. rightObserver.Dispose();
  39. }
  40. /// <summary>
  41. /// Try winning the race for the right of emission.
  42. /// </summary>
  43. /// <param name="isLeft">If true, the contender is the left source.</param>
  44. /// <returns>True if the contender has won the race.</returns>
  45. public bool TryWin(bool isLeft)
  46. {
  47. var index = isLeft ? 1 : 2;
  48. if (Volatile.Read(ref winner) == index)
  49. {
  50. return true;
  51. }
  52. if (Interlocked.CompareExchange(ref winner, index, 0) == 0)
  53. {
  54. (isLeft ? rightObserver : leftObserver).Dispose();
  55. return true;
  56. }
  57. return false;
  58. }
  59. sealed class AmbObserver : IObserver<TSource>, IDisposable
  60. {
  61. readonly IObserver<TSource> downstream;
  62. readonly AmbCoordinator parent;
  63. readonly bool isLeft;
  64. IDisposable upstream;
  65. /// <summary>
  66. /// If true, this observer won the race and now can emit
  67. /// on a fast path.
  68. /// </summary>
  69. bool iwon;
  70. public AmbObserver(IObserver<TSource> downstream, AmbCoordinator parent, bool isLeft)
  71. {
  72. this.downstream = downstream;
  73. this.parent = parent;
  74. this.isLeft = isLeft;
  75. }
  76. internal void OnSubscribe(IDisposable d)
  77. {
  78. if (Interlocked.CompareExchange(ref upstream, d, null) != null)
  79. {
  80. d?.Dispose();
  81. }
  82. }
  83. public void Dispose()
  84. {
  85. Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
  86. }
  87. public void OnCompleted()
  88. {
  89. if (iwon)
  90. {
  91. downstream.OnCompleted();
  92. }
  93. else
  94. if (parent.TryWin(isLeft))
  95. {
  96. iwon = true;
  97. downstream.OnCompleted();
  98. }
  99. Dispose();
  100. }
  101. public void OnError(Exception error)
  102. {
  103. if (iwon)
  104. {
  105. downstream.OnError(error);
  106. }
  107. else
  108. if (parent.TryWin(isLeft))
  109. {
  110. iwon = true;
  111. downstream.OnError(error);
  112. }
  113. Dispose();
  114. }
  115. public void OnNext(TSource value)
  116. {
  117. if (iwon)
  118. {
  119. downstream.OnNext(value);
  120. }
  121. else
  122. if (parent.TryWin(isLeft))
  123. {
  124. iwon = true;
  125. downstream.OnNext(value);
  126. }
  127. }
  128. }
  129. }
  130. }
  131. }