Amb.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerableEx
  11. {
  12. public static IAsyncEnumerable<TSource> Amb<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  13. {
  14. if (first == null)
  15. throw Error.ArgumentNull(nameof(first));
  16. if (second == null)
  17. throw Error.ArgumentNull(nameof(second));
  18. #if USE_ASYNC_ITERATOR
  19. return AsyncEnumerable.Create(Core);
  20. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  21. {
  22. IAsyncEnumerator<TSource> firstEnumerator = null;
  23. IAsyncEnumerator<TSource> secondEnumerator = null;
  24. Task<bool> firstMoveNext = null;
  25. Task<bool> secondMoveNext = null;
  26. try
  27. {
  28. //
  29. // REVIEW: We start both sequences unconditionally. An alternative implementation could be to just stick
  30. // to the first sequence if we notice it already has a value (or exception) available. This would
  31. // be similar to Task.WhenAny behavior (see CommonCWAnyLogic in dotnet/coreclr). We could consider
  32. // adding a WhenAny combinator that does exactly that. We can even avoid calling AsTask.
  33. //
  34. firstEnumerator = first.GetAsyncEnumerator(cancellationToken);
  35. firstMoveNext = firstEnumerator.MoveNextAsync().AsTask();
  36. //
  37. // REVIEW: Order of operations has changed here compared to the original, but is now in sync with the N-ary
  38. // overload which performs GetAsyncEnumerator/MoveNextAsync in pairs, rather than phased.
  39. //
  40. secondEnumerator = second.GetAsyncEnumerator(cancellationToken);
  41. secondMoveNext = secondEnumerator.MoveNextAsync().AsTask();
  42. }
  43. catch
  44. {
  45. // NB: AwaitMoveNextAsyncAndDispose checks for null for both arguments, reducing the need for many null
  46. // checks over here.
  47. var cleanup = new[]
  48. {
  49. AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator),
  50. AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator)
  51. };
  52. await Task.WhenAll(cleanup).ConfigureAwait(false);
  53. throw;
  54. }
  55. //
  56. // REVIEW: Consider using the WhenAny combinator defined for Merge in TaskExt, which would avoid the need
  57. // to convert to Task<bool> prior to calling Task.WhenAny.
  58. //
  59. var moveNextWinner = await Task.WhenAny(firstMoveNext, secondMoveNext).ConfigureAwait(false);
  60. //
  61. // REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks:
  62. //
  63. // 1. Concurrent DisposeAsync while a MoveNextAsync is in flight.
  64. // 2. The winner elected by Amb is blocked to yield results until the loser unblocks.
  65. //
  66. IAsyncEnumerator<TSource> winner;
  67. Task disposeLoser;
  68. if (moveNextWinner == firstMoveNext)
  69. {
  70. winner = firstEnumerator;
  71. disposeLoser = AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator);
  72. }
  73. else
  74. {
  75. winner = secondEnumerator;
  76. disposeLoser = AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator);
  77. }
  78. try
  79. {
  80. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  81. {
  82. if (!await moveNextWinner.ConfigureAwait(false))
  83. {
  84. yield break;
  85. }
  86. yield return winner.Current;
  87. while (await winner.MoveNextAsync().ConfigureAwait(false))
  88. {
  89. yield return winner.Current;
  90. }
  91. }
  92. finally
  93. {
  94. await winner.DisposeAsync().ConfigureAwait(false);
  95. }
  96. }
  97. finally
  98. {
  99. //
  100. // REVIEW: This behavior differs from the original implementation in that we never discard any in flight
  101. // asynchronous operations. If an exception occurs while enumerating the winner, it can be
  102. // subsumed by an exception thrown due to cleanup of the loser. Also, if Amb is used to deal with
  103. // a potentially long-blocking sequence, this implementation would transfer this blocking behavior
  104. // to the resulting sequence. However, it seems never discarding a non-completed task should be a
  105. // general design tenet, and fire-and-forget dispose behavior could be added as another "unsafe"
  106. // operator, so all such sins are made explicit by the user. Nonetheless, this change is a breaking
  107. // change for Ix Async.
  108. //
  109. await disposeLoser.ConfigureAwait(false);
  110. }
  111. }
  112. #else
  113. return new AmbAsyncIterator<TSource>(first, second);
  114. #endif
  115. }
  116. public static IAsyncEnumerable<TSource> Amb<TSource>(params IAsyncEnumerable<TSource>[] sources)
  117. {
  118. if (sources == null)
  119. throw Error.ArgumentNull(nameof(sources));
  120. #if USE_ASYNC_ITERATOR
  121. return AsyncEnumerable.Create(Core);
  122. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  123. {
  124. //
  125. // REVIEW: See remarks on binary overload for changes compared to the original.
  126. //
  127. var n = sources.Length;
  128. var enumerators = new IAsyncEnumerator<TSource>[n];
  129. var moveNexts = new Task<bool>[n];
  130. try
  131. {
  132. for (var i = 0; i < n; i++)
  133. {
  134. var enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
  135. enumerators[i] = enumerator;
  136. moveNexts[i] = enumerator.MoveNextAsync().AsTask();
  137. }
  138. }
  139. catch
  140. {
  141. var cleanup = new Task[n];
  142. for (var i = 0; i < n; i++)
  143. {
  144. cleanup[i] = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]);
  145. }
  146. await Task.WhenAll(cleanup).ConfigureAwait(false);
  147. throw;
  148. }
  149. var moveNextWinner = await Task.WhenAny(moveNexts).ConfigureAwait(false);
  150. //
  151. // NB: The use of IndexOf is fine. If task N completed by returning a ValueTask<bool>
  152. // which is equivalent to the task returned by task M (where M < N), AsTask may
  153. // return the same reference (e.g. due to caching of completed Boolean tasks). In
  154. // such a case, IndexOf will find task M rather than N in the array, but both have
  155. // an equivalent completion state (because they're reference equal). This only leads
  156. // to a left-bias in selection of sources, but given Amb's "ambiguous" nature, this
  157. // is acceptable.
  158. //
  159. var winnerIndex = Array.IndexOf(moveNexts, moveNextWinner);
  160. var winner = enumerators[winnerIndex];
  161. var loserCleanupTasks = new List<Task>(n - 1);
  162. for (var i = 0; i < n; i++)
  163. {
  164. if (i != winnerIndex)
  165. {
  166. var loserCleanupTask = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]);
  167. loserCleanupTasks.Add(loserCleanupTask);
  168. }
  169. }
  170. var cleanupLosers = Task.WhenAll(loserCleanupTasks);
  171. try
  172. {
  173. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  174. {
  175. if (!await moveNextWinner.ConfigureAwait(false))
  176. {
  177. yield break;
  178. }
  179. yield return winner.Current;
  180. while (await winner.MoveNextAsync().ConfigureAwait(false))
  181. {
  182. yield return winner.Current;
  183. }
  184. }
  185. finally
  186. {
  187. await winner.DisposeAsync().ConfigureAwait(false);
  188. }
  189. }
  190. finally
  191. {
  192. await cleanupLosers.ConfigureAwait(false);
  193. }
  194. }
  195. #else
  196. return new AmbAsyncIteratorN<TSource>(sources);
  197. #endif
  198. }
  199. public static IAsyncEnumerable<TSource> Amb<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  200. {
  201. if (sources == null)
  202. throw Error.ArgumentNull(nameof(sources));
  203. #if USE_ASYNC_ITERATOR
  204. return Amb(sources.ToArray());
  205. #else
  206. return new AmbAsyncIteratorN<TSource>(sources.ToArray());
  207. #endif
  208. }
  209. #if USE_ASYNC_ITERATOR
  210. private static async Task AwaitMoveNextAsyncAndDispose<T>(Task<bool> moveNextAsync, IAsyncEnumerator<T> enumerator)
  211. {
  212. if (enumerator != null)
  213. {
  214. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  215. {
  216. if (moveNextAsync != null)
  217. {
  218. await moveNextAsync.ConfigureAwait(false);
  219. }
  220. }
  221. finally
  222. {
  223. await enumerator.DisposeAsync().ConfigureAwait(false);
  224. }
  225. }
  226. }
  227. #endif
  228. #if !USE_ASYNC_ITERATOR
  229. private sealed class AmbAsyncIterator<TSource> : AsyncIterator<TSource>
  230. {
  231. private readonly IAsyncEnumerable<TSource> _first;
  232. private readonly IAsyncEnumerable<TSource> _second;
  233. private IAsyncEnumerator<TSource> _enumerator;
  234. public AmbAsyncIterator(IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  235. {
  236. Debug.Assert(first != null);
  237. Debug.Assert(second != null);
  238. _first = first;
  239. _second = second;
  240. }
  241. public override AsyncIteratorBase<TSource> Clone()
  242. {
  243. return new AmbAsyncIterator<TSource>(_first, _second);
  244. }
  245. public override async ValueTask DisposeAsync()
  246. {
  247. if (_enumerator != null)
  248. {
  249. await _enumerator.DisposeAsync().ConfigureAwait(false);
  250. _enumerator = null;
  251. }
  252. await base.DisposeAsync().ConfigureAwait(false);
  253. }
  254. protected override async ValueTask<bool> MoveNextCore()
  255. {
  256. switch (_state)
  257. {
  258. case AsyncIteratorState.Allocated:
  259. //
  260. // REVIEW: Exceptions in any of these steps don't cause cleanup. This has been fixed in the new implementation.
  261. //
  262. var firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken);
  263. var secondEnumerator = _second.GetAsyncEnumerator(_cancellationToken);
  264. var firstMoveNext = firstEnumerator.MoveNextAsync().AsTask();
  265. var secondMoveNext = secondEnumerator.MoveNextAsync().AsTask();
  266. var winner = await Task.WhenAny(firstMoveNext, secondMoveNext).ConfigureAwait(false);
  267. //
  268. // REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks:
  269. //
  270. // 1. Concurrent DisposeAsync while a MoveNextAsync is in flight.
  271. // 2. The winner elected by Amb is blocked to yield results until the loser unblocks.
  272. //
  273. // The approach below has one drawback, namely that exceptions raised by loser are dropped on the floor.
  274. //
  275. if (winner == firstMoveNext)
  276. {
  277. _enumerator = firstEnumerator;
  278. _ = secondMoveNext.ContinueWith(_ =>
  279. {
  280. secondEnumerator.DisposeAsync();
  281. });
  282. }
  283. else
  284. {
  285. _enumerator = secondEnumerator;
  286. _ = firstMoveNext.ContinueWith(_ =>
  287. {
  288. firstEnumerator.DisposeAsync();
  289. });
  290. }
  291. _state = AsyncIteratorState.Iterating;
  292. if (await winner.ConfigureAwait(false))
  293. {
  294. _current = _enumerator.Current;
  295. return true;
  296. }
  297. break;
  298. case AsyncIteratorState.Iterating:
  299. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  300. {
  301. _current = _enumerator.Current;
  302. return true;
  303. }
  304. break;
  305. }
  306. await DisposeAsync().ConfigureAwait(false);
  307. return false;
  308. }
  309. }
  310. private sealed class AmbAsyncIteratorN<TSource> : AsyncIterator<TSource>
  311. {
  312. private readonly IAsyncEnumerable<TSource>[] _sources;
  313. private IAsyncEnumerator<TSource> _enumerator;
  314. public AmbAsyncIteratorN(IAsyncEnumerable<TSource>[] sources)
  315. {
  316. Debug.Assert(sources != null);
  317. _sources = sources;
  318. }
  319. public override AsyncIteratorBase<TSource> Clone()
  320. {
  321. return new AmbAsyncIteratorN<TSource>(_sources);
  322. }
  323. public override async ValueTask DisposeAsync()
  324. {
  325. if (_enumerator != null)
  326. {
  327. await _enumerator.DisposeAsync().ConfigureAwait(false);
  328. _enumerator = null;
  329. }
  330. await base.DisposeAsync().ConfigureAwait(false);
  331. }
  332. protected override async ValueTask<bool> MoveNextCore()
  333. {
  334. switch (_state)
  335. {
  336. case AsyncIteratorState.Allocated:
  337. var n = _sources.Length;
  338. var enumerators = new IAsyncEnumerator<TSource>[n];
  339. var moveNexts = new Task<bool>[n];
  340. for (var i = 0; i < n; i++)
  341. {
  342. var enumerator = _sources[i].GetAsyncEnumerator(_cancellationToken);
  343. enumerators[i] = enumerator;
  344. moveNexts[i] = enumerator.MoveNextAsync().AsTask();
  345. }
  346. var winner = await Task.WhenAny(moveNexts).ConfigureAwait(false);
  347. //
  348. // REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks:
  349. //
  350. // 1. Concurrent DisposeAsync while a MoveNextAsync is in flight.
  351. // 2. The winner elected by Amb is blocked to yield results until all losers unblocks.
  352. //
  353. // The approach below has one drawback, namely that exceptions raised by any loser are dropped on the floor.
  354. //
  355. var winnerIndex = Array.IndexOf(moveNexts, winner);
  356. _enumerator = enumerators[winnerIndex];
  357. for (var i = 0; i < n; i++)
  358. {
  359. if (i != winnerIndex)
  360. {
  361. _ = moveNexts[i].ContinueWith(_ =>
  362. {
  363. enumerators[i].DisposeAsync();
  364. });
  365. }
  366. }
  367. _state = AsyncIteratorState.Iterating;
  368. if (await winner.ConfigureAwait(false))
  369. {
  370. _current = _enumerator.Current;
  371. return true;
  372. }
  373. break;
  374. case AsyncIteratorState.Iterating:
  375. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  376. {
  377. _current = _enumerator.Current;
  378. return true;
  379. }
  380. break;
  381. }
  382. await DisposeAsync().ConfigureAwait(false);
  383. return false;
  384. }
  385. }
  386. #endif
  387. }
  388. }