AmbMany.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. using System.Text;
  8. using System.Threading;
  9. using System.Linq;
  10. namespace System.Reactive.Linq.ObservableImpl
  11. {
  12. internal sealed class AmbManyArray<T> : BasicProducer<T>
  13. {
  14. readonly IObservable<T>[] sources;
  15. public AmbManyArray(IObservable<T>[] sources)
  16. {
  17. this.sources = sources;
  18. }
  19. protected override IDisposable Run(IObserver<T> observer)
  20. {
  21. return AmbCoordinator<T>.Create(observer, sources);
  22. }
  23. }
  24. internal sealed class AmbManyEnumerable<T> : BasicProducer<T>
  25. {
  26. readonly IEnumerable<IObservable<T>> sources;
  27. public AmbManyEnumerable(IEnumerable<IObservable<T>> sources)
  28. {
  29. this.sources = sources;
  30. }
  31. protected override IDisposable Run(IObserver<T> observer)
  32. {
  33. var sourcesEnumerable = this.sources;
  34. var sources = default(IObservable<T>[]);
  35. try
  36. {
  37. sources = sourcesEnumerable.ToArray();
  38. }
  39. catch (Exception ex)
  40. {
  41. observer.OnError(ex);
  42. return Disposable.Empty;
  43. }
  44. return AmbCoordinator<T>.Create(observer, sources);
  45. }
  46. }
  47. internal sealed class AmbCoordinator<T> : IDisposable
  48. {
  49. readonly IObserver<T> downstream;
  50. readonly InnerObserver[] observers;
  51. int winner;
  52. internal AmbCoordinator(IObserver<T> downstream, int n)
  53. {
  54. this.downstream = downstream;
  55. var o = new InnerObserver[n];
  56. for (int i = 0; i < n; i++)
  57. {
  58. o[i] = new InnerObserver(this, i);
  59. }
  60. observers = o;
  61. Volatile.Write(ref winner, -1);
  62. }
  63. internal static IDisposable Create(IObserver<T> observer, IObservable<T>[] sources)
  64. {
  65. var n = sources.Length;
  66. if (n == 0)
  67. {
  68. observer.OnCompleted();
  69. return Disposable.Empty;
  70. }
  71. if (n == 1)
  72. {
  73. return sources[0].Subscribe(observer);
  74. }
  75. var parent = new AmbCoordinator<T>(observer, n);
  76. parent.Subscribe(sources);
  77. return parent;
  78. }
  79. internal void Subscribe(IObservable<T>[] sources)
  80. {
  81. for (var i = 0; i < observers.Length; i++)
  82. {
  83. var inner = Volatile.Read(ref observers[i]);
  84. if (inner == null)
  85. {
  86. break;
  87. }
  88. inner.OnSubscribe(sources[i].Subscribe(inner));
  89. }
  90. }
  91. public void Dispose()
  92. {
  93. for (var i = 0; i < observers.Length; i++)
  94. {
  95. Interlocked.Exchange(ref observers[i], null)?.Dispose();
  96. }
  97. }
  98. bool TryWin(int index)
  99. {
  100. if (Volatile.Read(ref winner) == -1 && Interlocked.CompareExchange(ref winner, index, -1) == -1)
  101. {
  102. for (var i = 0; i < observers.Length; i++)
  103. {
  104. if (index != i)
  105. {
  106. Interlocked.Exchange(ref observers[i], null)?.Dispose();
  107. }
  108. }
  109. return true;
  110. }
  111. return false;
  112. }
  113. internal sealed class InnerObserver : IObserver<T>, IDisposable
  114. {
  115. readonly IObserver<T> downstream;
  116. readonly AmbCoordinator<T> parent;
  117. readonly int index;
  118. IDisposable upstream;
  119. bool won;
  120. public InnerObserver(AmbCoordinator<T> parent, int index)
  121. {
  122. this.downstream = parent.downstream;
  123. this.parent = parent;
  124. this.index = index;
  125. }
  126. public void Dispose()
  127. {
  128. Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
  129. }
  130. public void OnCompleted()
  131. {
  132. if (won)
  133. {
  134. downstream.OnCompleted();
  135. }
  136. else
  137. if (parent.TryWin(index))
  138. {
  139. won = true;
  140. downstream.OnCompleted();
  141. }
  142. Dispose();
  143. }
  144. public void OnError(Exception error)
  145. {
  146. if (won)
  147. {
  148. downstream.OnError(error);
  149. }
  150. else
  151. if (parent.TryWin(index))
  152. {
  153. won = true;
  154. downstream.OnError(error);
  155. }
  156. Dispose();
  157. }
  158. public void OnNext(T value)
  159. {
  160. if (won)
  161. {
  162. downstream.OnNext(value);
  163. }
  164. else
  165. if (parent.TryWin(index))
  166. {
  167. won = true;
  168. downstream.OnNext(value);
  169. } else
  170. {
  171. Dispose();
  172. }
  173. }
  174. internal void OnSubscribe(IDisposable d)
  175. {
  176. if (Interlocked.CompareExchange(ref upstream, d, null) != null)
  177. {
  178. d?.Dispose();
  179. }
  180. }
  181. }
  182. }
  183. }