Amb.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Reactive.Linq
  10. {
  11. public partial class AsyncObservable
  12. {
  13. public static IAsyncObservable<TSource> Amb<TSource>(this IAsyncObservable<TSource> first, IAsyncObservable<TSource> second)
  14. {
  15. if (first == null)
  16. throw new ArgumentNullException(nameof(first));
  17. if (second == null)
  18. throw new ArgumentNullException(nameof(second));
  19. return CreateAsyncObservable<TSource>.From(
  20. first,
  21. second,
  22. static async (first, second, observer) =>
  23. {
  24. var firstSubscription = new SingleAssignmentAsyncDisposable();
  25. var secondSubscription = new SingleAssignmentAsyncDisposable();
  26. var (firstObserver, secondObserver) = AsyncObserver.Amb(observer, firstSubscription, secondSubscription);
  27. var firstTask = first.SubscribeSafeAsync(firstObserver).AsTask().ContinueWith(d => firstSubscription.AssignAsync(d.Result).AsTask()).Unwrap();
  28. var secondTask = second.SubscribeSafeAsync(secondObserver).AsTask().ContinueWith(d => secondSubscription.AssignAsync(d.Result).AsTask()).Unwrap();
  29. await Task.WhenAll(firstTask, secondTask).ConfigureAwait(false);
  30. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  31. });
  32. }
  33. public static IAsyncObservable<TSource> Amb<TSource>(this IEnumerable<IAsyncObservable<TSource>> sources) => Amb(sources.ToArray());
  34. public static IAsyncObservable<TSource> Amb<TSource>(params IAsyncObservable<TSource>[] sources)
  35. {
  36. if (sources == null)
  37. throw new ArgumentNullException(nameof(sources));
  38. return Create<TSource>(async observer =>
  39. {
  40. var count = sources.Length;
  41. var subscriptions = new SingleAssignmentAsyncDisposable[count];
  42. for (var i = 0; i < count; i++)
  43. {
  44. subscriptions[i] = new SingleAssignmentAsyncDisposable();
  45. }
  46. var observers = AsyncObserver.Amb(observer, subscriptions);
  47. var tasks = new Task[count];
  48. for (var i = 0; i < count; i++)
  49. {
  50. tasks[i] = sources[i].SubscribeSafeAsync(observers[i]).AsTask().ContinueWith(d => subscriptions[i].AssignAsync(d.Result).AsTask()).Unwrap();
  51. }
  52. await Task.WhenAll(tasks).ConfigureAwait(false);
  53. return StableCompositeAsyncDisposable.Create(subscriptions);
  54. });
  55. }
  56. }
  57. public partial class AsyncObserver
  58. {
  59. public static (IAsyncObserver<TSource>, IAsyncObserver<TSource>) Amb<TSource>(IAsyncObserver<TSource> observer, IAsyncDisposable first, IAsyncDisposable second)
  60. {
  61. if (observer == null)
  62. throw new ArgumentNullException(nameof(observer));
  63. if (first == null)
  64. throw new ArgumentNullException(nameof(first));
  65. if (second == null)
  66. throw new ArgumentNullException(nameof(second));
  67. var gate = new AsyncGate();
  68. var state = AmbState.None;
  69. return
  70. (
  71. Create<TSource>(
  72. async x =>
  73. {
  74. using (await gate.LockAsync().ConfigureAwait(false))
  75. {
  76. if (state == AmbState.None)
  77. {
  78. state = AmbState.First;
  79. await second.DisposeAsync().ConfigureAwait(false);
  80. }
  81. if (state == AmbState.First)
  82. {
  83. await observer.OnNextAsync(x).ConfigureAwait(false);
  84. }
  85. }
  86. },
  87. async ex =>
  88. {
  89. using (await gate.LockAsync().ConfigureAwait(false))
  90. {
  91. if (state == AmbState.None)
  92. {
  93. state = AmbState.First;
  94. await second.DisposeAsync().ConfigureAwait(false);
  95. }
  96. if (state == AmbState.First)
  97. {
  98. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  99. }
  100. }
  101. },
  102. async () =>
  103. {
  104. using (await gate.LockAsync().ConfigureAwait(false))
  105. {
  106. if (state == AmbState.None)
  107. {
  108. state = AmbState.First;
  109. await second.DisposeAsync().ConfigureAwait(false);
  110. }
  111. if (state == AmbState.First)
  112. {
  113. await observer.OnCompletedAsync().ConfigureAwait(false);
  114. }
  115. }
  116. }
  117. ),
  118. Create<TSource>(
  119. async x =>
  120. {
  121. using (await gate.LockAsync().ConfigureAwait(false))
  122. {
  123. if (state == AmbState.None)
  124. {
  125. state = AmbState.Second;
  126. await first.DisposeAsync().ConfigureAwait(false);
  127. }
  128. if (state == AmbState.Second)
  129. {
  130. await observer.OnNextAsync(x).ConfigureAwait(false);
  131. }
  132. }
  133. },
  134. async ex =>
  135. {
  136. using (await gate.LockAsync().ConfigureAwait(false))
  137. {
  138. if (state == AmbState.None)
  139. {
  140. state = AmbState.Second;
  141. await first.DisposeAsync().ConfigureAwait(false);
  142. }
  143. if (state == AmbState.Second)
  144. {
  145. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  146. }
  147. }
  148. },
  149. async () =>
  150. {
  151. using (await gate.LockAsync().ConfigureAwait(false))
  152. {
  153. if (state == AmbState.None)
  154. {
  155. state = AmbState.Second;
  156. await first.DisposeAsync().ConfigureAwait(false);
  157. }
  158. if (state == AmbState.Second)
  159. {
  160. await observer.OnCompletedAsync().ConfigureAwait(false);
  161. }
  162. }
  163. }
  164. )
  165. );
  166. }
  167. public static IAsyncObserver<TSource>[] Amb<TSource>(IAsyncObserver<TSource> observer, IAsyncDisposable[] subscriptions)
  168. {
  169. if (observer == null)
  170. throw new ArgumentNullException(nameof(observer));
  171. if (subscriptions == null)
  172. throw new ArgumentNullException(nameof(subscriptions));
  173. var gate = new AsyncGate();
  174. var winner = default(int?);
  175. var count = subscriptions.Length;
  176. async Task ElectWinnerAsync(int index)
  177. {
  178. winner = index;
  179. var dispose = new List<Task>(count - 1);
  180. for (var i = 0; i < count; i++)
  181. {
  182. if (i != index)
  183. {
  184. dispose.Add(subscriptions[i].DisposeAsync().AsTask());
  185. }
  186. }
  187. await Task.WhenAll(dispose).ConfigureAwait(false);
  188. }
  189. IAsyncObserver<TSource> CreateObserver(int index) =>
  190. Create<TSource>(
  191. async x =>
  192. {
  193. using (await gate.LockAsync().ConfigureAwait(false))
  194. {
  195. if (winner == null)
  196. {
  197. await ElectWinnerAsync(index).ConfigureAwait(false);
  198. }
  199. if (winner == index)
  200. {
  201. await observer.OnNextAsync(x).ConfigureAwait(false);
  202. }
  203. }
  204. },
  205. async ex =>
  206. {
  207. using (await gate.LockAsync().ConfigureAwait(false))
  208. {
  209. if (winner == null)
  210. {
  211. await ElectWinnerAsync(index).ConfigureAwait(false);
  212. }
  213. if (winner == index)
  214. {
  215. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  216. }
  217. }
  218. },
  219. async () =>
  220. {
  221. using (await gate.LockAsync().ConfigureAwait(false))
  222. {
  223. if (winner == null)
  224. {
  225. await ElectWinnerAsync(index).ConfigureAwait(false);
  226. }
  227. if (winner == index)
  228. {
  229. await observer.OnCompletedAsync().ConfigureAwait(false);
  230. }
  231. }
  232. }
  233. );
  234. var res = new IAsyncObserver<TSource>[count];
  235. for (var i = 0; i < count; i++)
  236. {
  237. res[i] = CreateObserver(i);
  238. }
  239. return res;
  240. }
  241. private enum AmbState
  242. {
  243. None,
  244. First,
  245. Second,
  246. }
  247. }
  248. }