Merge.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerableEx
  11. {
  12. public static IAsyncEnumerable<TSource> Merge<TSource>(params IAsyncEnumerable<TSource>[] sources)
  13. {
  14. if (sources == null)
  15. throw Error.ArgumentNull(nameof(sources));
  16. #if USE_FAIR_AND_CHEAPER_MERGE
  17. //
  18. // This new implementation of Merge differs from the original one in a few ways:
  19. //
  20. // - It's cheaper because:
  21. // - no conversion from ValueTask<bool> to Task<bool> takes place using AsTask,
  22. // - we don't instantiate Task.WhenAny tasks for each iteration.
  23. // - It's fairer because:
  24. // - the MoveNextAsync tasks are awaited concurently, but completions are queued,
  25. // instead of awaiting a new WhenAny task where "left" sources have preferential
  26. // treatment over "right" sources.
  27. //
  28. return AsyncEnumerable.Create(Core);
  29. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  30. {
  31. var count = sources.Length;
  32. var enumerators = new IAsyncEnumerator<TSource>[count];
  33. var moveNextTasks = new ValueTask<bool>[count];
  34. try
  35. {
  36. for (var i = 0; i < count; i++)
  37. {
  38. IAsyncEnumerator<TSource> enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
  39. enumerators[i] = enumerator;
  40. // REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
  41. // operations immediately. An alternative would be to do this in a separate stage, thus
  42. // preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
  43. // any MoveNextAsync calls before all enumerators are acquired (or an exception has
  44. // occurred doing so).
  45. moveNextTasks[i] = enumerator.MoveNextAsync();
  46. }
  47. var whenAny = TaskExt.WhenAny(moveNextTasks);
  48. int active = count;
  49. while (active > 0)
  50. {
  51. int index = await whenAny;
  52. IAsyncEnumerator<TSource> enumerator = enumerators[index];
  53. ValueTask<bool> moveNextTask = moveNextTasks[index];
  54. if (!await moveNextTask.ConfigureAwait(false))
  55. {
  56. //
  57. // Replace the task in our array by a completed task to make finally logic easier. Note that
  58. // the WhenAnyValueTask object has a reference to our array (i.e. no copy is made), so this
  59. // gets rid of any resources the original task may have held onto. However, we *don't* call
  60. // whenAny.Replace to set this value, because it'd attach an awaiter to the already completed
  61. // task, causing spurious wake-ups when awaiting whenAny.
  62. //
  63. moveNextTasks[index] = new ValueTask<bool>();
  64. // REVIEW: The original implementation did not dispose eagerly, which could lead to resource
  65. // leaks when merged with other long-running sequences.
  66. enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
  67. await enumerator.DisposeAsync().ConfigureAwait(false);
  68. active--;
  69. }
  70. else
  71. {
  72. TSource item = enumerator.Current;
  73. //
  74. // Replace the task using whenAny.Replace, which will write it to the moveNextTasks array, and
  75. // will start awaiting the task. Note we don't have to write to moveNextTasks ourselves because
  76. // the whenAny object has a reference to it (i.e. no copy is made).
  77. //
  78. whenAny.Replace(index, enumerator.MoveNextAsync());
  79. yield return item;
  80. }
  81. }
  82. }
  83. finally
  84. {
  85. // REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
  86. // additional uncontrollable source of concurrency and the sequential resource acquisition. In
  87. // this modern implementation, we release resources in opposite order as we acquired them, thus
  88. // guaranteeing determinism (and mimicking a series of nested `await using` statements).
  89. // REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
  90. // the operator implementation, we should make this symmetric and first await all in flight
  91. // MoveNextAsync operations, prior to disposing the enumerators.
  92. var errors = default(List<Exception>);
  93. for (var i = count - 1; i >= 0; i--)
  94. {
  95. ValueTask<bool> moveNextTask = moveNextTasks[i];
  96. IAsyncEnumerator<TSource> enumerator = enumerators[i];
  97. try
  98. {
  99. try
  100. {
  101. //
  102. // Await the task to ensure outstanding work is completed prior to performing a dispose
  103. // operation. Note that we don't have to do anything special for tasks belonging to
  104. // enumerators that have finished; we swapped in a placeholder completed task.
  105. //
  106. // REVIEW: This adds an additional continuation to all of the pending tasks (note that
  107. // whenAny also has registered one). The whenAny object will be collectible
  108. // after all of these complete. Alternatively, we could drain via whenAny, by
  109. // awaiting it until the active count drops to 0. This saves on attaching the
  110. // additional continuations, but we need to decide on order of dispose. Right
  111. // now, we dispose in opposite order of acquiring the enumerators, with the
  112. // exception of enumerators that were disposed eagerly upon early completion.
  113. // Should we care about the dispose order at all?
  114. _ = await moveNextTask.ConfigureAwait(false);
  115. }
  116. finally
  117. {
  118. if (enumerator != null)
  119. {
  120. await enumerator.DisposeAsync().ConfigureAwait(false);
  121. }
  122. }
  123. }
  124. catch (Exception ex)
  125. {
  126. if (errors == null)
  127. {
  128. errors = new List<Exception>();
  129. }
  130. errors.Add(ex);
  131. }
  132. }
  133. // NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
  134. // instead of the original exception that may have led to running the finally block. This is similar
  135. // to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
  136. // concurrent sequences being merged).
  137. if (errors != null)
  138. {
  139. throw new AggregateException(errors);
  140. }
  141. }
  142. }
  143. #elif USE_ASYNC_ITERATOR
  144. return AsyncEnumerable.Create(Core);
  145. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  146. {
  147. var count = sources.Length;
  148. var enumerators = new IAsyncEnumerator<TSource>[count];
  149. var moveNextTasks = new Task<bool>[count];
  150. try
  151. {
  152. for (var i = 0; i < count; i++)
  153. {
  154. IAsyncEnumerator<TSource> enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
  155. enumerators[i] = enumerator;
  156. // REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
  157. // operations immediately. An alternative would be to do this in a separate stage, thus
  158. // preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
  159. // any MoveNextAsync calls before all enumerators are acquired (or an exception has
  160. // occurred doing so).
  161. moveNextTasks[i] = enumerator.MoveNextAsync().AsTask();
  162. }
  163. int active = count;
  164. while (active > 0)
  165. {
  166. // REVIEW: Performance of WhenAny may be an issue when called repeatedly like this. We should
  167. // measure and could consider operating directly on the ValueTask<bool> objects, thus
  168. // also preventing the Task<bool> allocations from AsTask.
  169. var moveNextTask = await Task.WhenAny(moveNextTasks).ConfigureAwait(false);
  170. // REVIEW: This seems wrong. AsTask can return the original Task<bool> (if the ValueTask<bool>
  171. // is wrapping one) or return a singleton instance for true and false, at which point
  172. // the use of IndexOf may pick an element closer to the start of the array because of
  173. // reference equality checks and aliasing effects. See GetTaskForResult in the BCL.
  174. int index = Array.IndexOf(moveNextTasks, moveNextTask);
  175. IAsyncEnumerator<TSource> enumerator = enumerators[index];
  176. if (!await moveNextTask.ConfigureAwait(false))
  177. {
  178. moveNextTasks[index] = TaskExt.Never;
  179. // REVIEW: The original implementation did not dispose eagerly, which could lead to resource
  180. // leaks when merged with other long-running sequences.
  181. enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
  182. await enumerator.DisposeAsync().ConfigureAwait(false);
  183. active--;
  184. }
  185. else
  186. {
  187. TSource item = enumerator.Current;
  188. moveNextTasks[index] = enumerator.MoveNextAsync().AsTask();
  189. yield return item;
  190. }
  191. }
  192. }
  193. finally
  194. {
  195. // REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
  196. // additional uncontrollable source of concurrency and the sequential resource acquisition. In
  197. // this modern implementation, we release resources in opposite order as we acquired them, thus
  198. // guaranteeing determinism (and mimicking a series of nested `await using` statements).
  199. // REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
  200. // the operator implementation, we should make this symmetric and first await all in flight
  201. // MoveNextAsync operations, prior to disposing the enumerators.
  202. var errors = default(List<Exception>);
  203. for (var i = count - 1; i >= 0; i--)
  204. {
  205. Task<bool> moveNextTask = moveNextTasks[i];
  206. IAsyncEnumerator<TSource> enumerator = enumerators[i];
  207. try
  208. {
  209. try
  210. {
  211. if (moveNextTask != null && moveNextTask != TaskExt.Never)
  212. {
  213. _ = await moveNextTask.ConfigureAwait(false);
  214. }
  215. }
  216. finally
  217. {
  218. if (enumerator != null)
  219. {
  220. await enumerator.DisposeAsync().ConfigureAwait(false);
  221. }
  222. }
  223. }
  224. catch (Exception ex)
  225. {
  226. if (errors == null)
  227. {
  228. errors = new List<Exception>();
  229. }
  230. errors.Add(ex);
  231. }
  232. }
  233. // NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
  234. // instead of the original exception that may have led to running the finally block. This is similar
  235. // to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
  236. // concurrent sequences being merged).
  237. if (errors != null)
  238. {
  239. throw new AggregateException(errors);
  240. }
  241. }
  242. }
  243. #else
  244. return new MergeAsyncIterator<TSource>(sources);
  245. #endif
  246. }
  247. public static IAsyncEnumerable<TSource> Merge<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  248. {
  249. if (sources == null)
  250. throw Error.ArgumentNull(nameof(sources));
  251. //
  252. // REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
  253. // avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation. It is
  254. // unfortunate though that the Merge overload accepting an array has always been concurrent, so we can't
  255. // change that either (in order to have consistency where Merge is non-concurrent, and ConcurrentMerge
  256. // is). We could consider a breaking change to Ix Async to streamline this, but we should do so when
  257. // shipping with the BCL interfaces (which is already a breaking change to existing Ix Async users). If
  258. // we go that route, we can either have:
  259. //
  260. // - All overloads of Merge are concurrent
  261. // - and continue to be named Merge, or,
  262. // - are renamed to ConcurrentMerge for clarity (likely alongside a ConcurrentZip).
  263. // - All overloads of Merge are non-concurrent
  264. // - and are simply SelectMany operator macros (maybe more optimized)
  265. // - Have ConcurrentMerge next to Merge overloads
  266. // - where ConcurrentMerge may need a degree of concurrency parameter (and maybe other options), and,
  267. // - where the overload set of both families may be asymmetric
  268. //
  269. return sources.ToAsyncEnumerable().SelectMany(source => source);
  270. }
  271. public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
  272. {
  273. if (sources == null)
  274. throw Error.ArgumentNull(nameof(sources));
  275. //
  276. // REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
  277. // avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation.
  278. //
  279. return sources.SelectMany(source => source);
  280. }
  281. private sealed class MergeAsyncIterator<TSource> : AsyncIterator<TSource>
  282. {
  283. private readonly IAsyncEnumerable<TSource>[] _sources;
  284. private IAsyncEnumerator<TSource>[] _enumerators;
  285. private Task<bool>[] _moveNexts;
  286. private int _active;
  287. public MergeAsyncIterator(IAsyncEnumerable<TSource>[] sources)
  288. {
  289. Debug.Assert(sources != null);
  290. _sources = sources;
  291. }
  292. public override AsyncIteratorBase<TSource> Clone()
  293. {
  294. return new MergeAsyncIterator<TSource>(_sources);
  295. }
  296. public override async ValueTask DisposeAsync()
  297. {
  298. if (_enumerators != null)
  299. {
  300. var n = _enumerators.Length;
  301. var disposes = new ValueTask[n];
  302. for (var i = 0; i < n; i++)
  303. {
  304. var dispose = _enumerators[i].DisposeAsync();
  305. disposes[i] = dispose;
  306. }
  307. await Task.WhenAll(disposes.Select(t => t.AsTask())).ConfigureAwait(false);
  308. _enumerators = null;
  309. }
  310. await base.DisposeAsync().ConfigureAwait(false);
  311. }
  312. protected override async ValueTask<bool> MoveNextCore()
  313. {
  314. switch (_state)
  315. {
  316. case AsyncIteratorState.Allocated:
  317. var n = _sources.Length;
  318. _enumerators = new IAsyncEnumerator<TSource>[n];
  319. _moveNexts = new Task<bool>[n];
  320. _active = n;
  321. for (var i = 0; i < n; i++)
  322. {
  323. var enumerator = _sources[i].GetAsyncEnumerator(_cancellationToken);
  324. _enumerators[i] = enumerator;
  325. _moveNexts[i] = enumerator.MoveNextAsync().AsTask();
  326. }
  327. _state = AsyncIteratorState.Iterating;
  328. goto case AsyncIteratorState.Iterating;
  329. case AsyncIteratorState.Iterating:
  330. while (_active > 0)
  331. {
  332. //
  333. // REVIEW: This approach does have a bias towards giving sources on the left
  334. // priority over sources on the right when yielding values. We may
  335. // want to consider a "prefer fairness" option.
  336. //
  337. var moveNext = await Task.WhenAny(_moveNexts).ConfigureAwait(false);
  338. // REVIEW: This seems wrong. AsTask can return the original Task<bool> (if the ValueTask<bool>
  339. // is wrapping one) or return a singleton instance for true and false, at which point
  340. // the use of IndexOf may pick an element closer to the start of the array because of
  341. // reference equality checks and aliasing effects. See GetTaskForResult in the BCL.
  342. var index = Array.IndexOf(_moveNexts, moveNext);
  343. if (!await moveNext.ConfigureAwait(false))
  344. {
  345. _moveNexts[index] = TaskExt.Never;
  346. _active--;
  347. }
  348. else
  349. {
  350. var enumerator = _enumerators[index];
  351. _current = enumerator.Current;
  352. _moveNexts[index] = enumerator.MoveNextAsync().AsTask();
  353. return true;
  354. }
  355. }
  356. break;
  357. }
  358. await DisposeAsync().ConfigureAwait(false);
  359. return false;
  360. }
  361. }
  362. }
  363. }