Merge.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerableEx
  10. {
  11. /// <summary>
  12. /// Merges elements from all of the specified async-enumerable sequences into a single async-enumerable sequence.
  13. /// </summary>
  14. /// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
  15. /// <param name="sources">Observable sequences.</param>
  16. /// <returns>The async-enumerable sequence that merges the elements of the async-enumerable sequences.</returns>
  17. /// <exception cref="ArgumentNullException"><paramref name="sources"/> is null.</exception>
  18. public static IAsyncEnumerable<TSource> Merge<TSource>(params IAsyncEnumerable<TSource>[] sources)
  19. {
  20. if (sources == null)
  21. throw Error.ArgumentNull(nameof(sources));
  22. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  23. return Core(sources);
  24. static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource>[] sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  25. #else
  26. return AsyncEnumerable.Create(Core);
  27. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  28. #endif
  29. #if USE_FAIR_AND_CHEAPER_MERGE
  30. //
  31. // This new implementation of Merge differs from the original one in a few ways:
  32. //
  33. // - It's cheaper because:
  34. // - no conversion from ValueTask<bool> to Task<bool> takes place using AsTask,
  35. // - we don't instantiate Task.WhenAny tasks for each iteration.
  36. // - It's fairer because:
  37. // - the MoveNextAsync tasks are awaited concurently, but completions are queued,
  38. // instead of awaiting a new WhenAny task where "left" sources have preferential
  39. // treatment over "right" sources.
  40. //
  41. {
  42. var count = sources.Length;
  43. var enumerators = new IAsyncEnumerator<TSource>[count];
  44. var moveNextTasks = new ValueTask<bool>[count];
  45. try
  46. {
  47. for (var i = 0; i < count; i++)
  48. {
  49. IAsyncEnumerator<TSource> enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
  50. enumerators[i] = enumerator;
  51. // REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
  52. // operations immediately. An alternative would be to do this in a separate stage, thus
  53. // preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
  54. // any MoveNextAsync calls before all enumerators are acquired (or an exception has
  55. // occurred doing so).
  56. moveNextTasks[i] = enumerator.MoveNextAsync();
  57. }
  58. var whenAny = TaskExt.WhenAny(moveNextTasks);
  59. int active = count;
  60. while (active > 0)
  61. {
  62. int index = await whenAny;
  63. IAsyncEnumerator<TSource> enumerator = enumerators[index];
  64. ValueTask<bool> moveNextTask = moveNextTasks[index];
  65. if (!await moveNextTask.ConfigureAwait(false))
  66. {
  67. //
  68. // Replace the task in our array by a completed task to make finally logic easier. Note that
  69. // the WhenAnyValueTask object has a reference to our array (i.e. no copy is made), so this
  70. // gets rid of any resources the original task may have held onto. However, we *don't* call
  71. // whenAny.Replace to set this value, because it'd attach an awaiter to the already completed
  72. // task, causing spurious wake-ups when awaiting whenAny.
  73. //
  74. moveNextTasks[index] = new ValueTask<bool>();
  75. // REVIEW: The original implementation did not dispose eagerly, which could lead to resource
  76. // leaks when merged with other long-running sequences.
  77. enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
  78. await enumerator.DisposeAsync().ConfigureAwait(false);
  79. active--;
  80. }
  81. else
  82. {
  83. TSource item = enumerator.Current;
  84. //
  85. // Replace the task using whenAny.Replace, which will write it to the moveNextTasks array, and
  86. // will start awaiting the task. Note we don't have to write to moveNextTasks ourselves because
  87. // the whenAny object has a reference to it (i.e. no copy is made).
  88. //
  89. whenAny.Replace(index, enumerator.MoveNextAsync());
  90. yield return item;
  91. }
  92. }
  93. }
  94. finally
  95. {
  96. // REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
  97. // additional uncontrollable source of concurrency and the sequential resource acquisition. In
  98. // this modern implementation, we release resources in opposite order as we acquired them, thus
  99. // guaranteeing determinism (and mimicking a series of nested `await using` statements).
  100. // REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
  101. // the operator implementation, we should make this symmetric and first await all in flight
  102. // MoveNextAsync operations, prior to disposing the enumerators.
  103. var errors = default(List<Exception>);
  104. for (var i = count - 1; i >= 0; i--)
  105. {
  106. ValueTask<bool> moveNextTask = moveNextTasks[i];
  107. IAsyncEnumerator<TSource> enumerator = enumerators[i];
  108. try
  109. {
  110. try
  111. {
  112. //
  113. // Await the task to ensure outstanding work is completed prior to performing a dispose
  114. // operation. Note that we don't have to do anything special for tasks belonging to
  115. // enumerators that have finished; we swapped in a placeholder completed task.
  116. //
  117. // REVIEW: This adds an additional continuation to all of the pending tasks (note that
  118. // whenAny also has registered one). The whenAny object will be collectible
  119. // after all of these complete. Alternatively, we could drain via whenAny, by
  120. // awaiting it until the active count drops to 0. This saves on attaching the
  121. // additional continuations, but we need to decide on order of dispose. Right
  122. // now, we dispose in opposite order of acquiring the enumerators, with the
  123. // exception of enumerators that were disposed eagerly upon early completion.
  124. // Should we care about the dispose order at all?
  125. _ = await moveNextTask.ConfigureAwait(false);
  126. }
  127. finally
  128. {
  129. if (enumerator != null)
  130. {
  131. await enumerator.DisposeAsync().ConfigureAwait(false);
  132. }
  133. }
  134. }
  135. catch (Exception ex)
  136. {
  137. if (errors == null)
  138. {
  139. errors = new List<Exception>();
  140. }
  141. errors.Add(ex);
  142. }
  143. }
  144. // NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
  145. // instead of the original exception that may have led to running the finally block. This is similar
  146. // to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
  147. // concurrent sequences being merged).
  148. if (errors != null)
  149. {
  150. throw new AggregateException(errors);
  151. }
  152. }
  153. }
  154. #else
  155. {
  156. var count = sources.Length;
  157. var enumerators = new IAsyncEnumerator<TSource>?[count];
  158. var moveNextTasks = new Task<bool>[count];
  159. try
  160. {
  161. for (var i = 0; i < count; i++)
  162. {
  163. var enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
  164. enumerators[i] = enumerator;
  165. // REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
  166. // operations immediately. An alternative would be to do this in a separate stage, thus
  167. // preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
  168. // any MoveNextAsync calls before all enumerators are acquired (or an exception has
  169. // occurred doing so).
  170. moveNextTasks[i] = enumerator.MoveNextAsync().AsTask();
  171. }
  172. var active = count;
  173. while (active > 0)
  174. {
  175. // REVIEW: Performance of WhenAny may be an issue when called repeatedly like this. We should
  176. // measure and could consider operating directly on the ValueTask<bool> objects, thus
  177. // also preventing the Task<bool> allocations from AsTask.
  178. var moveNextTask = await Task.WhenAny(moveNextTasks).ConfigureAwait(false);
  179. // REVIEW: This seems wrong. AsTask can return the original Task<bool> (if the ValueTask<bool>
  180. // is wrapping one) or return a singleton instance for true and false, at which point
  181. // the use of IndexOf may pick an element closer to the start of the array because of
  182. // reference equality checks and aliasing effects. See GetTaskForResult in the BCL.
  183. var index = Array.IndexOf(moveNextTasks, moveNextTask);
  184. var enumerator = enumerators[index]!; // NB: Only gets set to null after setting task to Never.
  185. if (!await moveNextTask.ConfigureAwait(false))
  186. {
  187. moveNextTasks[index] = TaskExt.Never;
  188. // REVIEW: The original implementation did not dispose eagerly, which could lead to resource
  189. // leaks when merged with other long-running sequences.
  190. enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
  191. await enumerator.DisposeAsync().ConfigureAwait(false);
  192. active--;
  193. }
  194. else
  195. {
  196. var item = enumerator.Current;
  197. moveNextTasks[index] = enumerator.MoveNextAsync().AsTask();
  198. yield return item;
  199. }
  200. }
  201. }
  202. finally
  203. {
  204. // REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
  205. // additional uncontrollable source of concurrency and the sequential resource acquisition. In
  206. // this modern implementation, we release resources in opposite order as we acquired them, thus
  207. // guaranteeing determinism (and mimicking a series of nested `await using` statements).
  208. // REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
  209. // the operator implementation, we should make this symmetric and first await all in flight
  210. // MoveNextAsync operations, prior to disposing the enumerators.
  211. var errors = default(List<Exception>);
  212. for (var i = count - 1; i >= 0; i--)
  213. {
  214. var moveNextTask = moveNextTasks[i];
  215. var enumerator = enumerators[i];
  216. try
  217. {
  218. try
  219. {
  220. if (moveNextTask != null && moveNextTask != TaskExt.Never)
  221. {
  222. _ = await moveNextTask.ConfigureAwait(false);
  223. }
  224. }
  225. finally
  226. {
  227. if (enumerator != null)
  228. {
  229. await enumerator.DisposeAsync().ConfigureAwait(false);
  230. }
  231. }
  232. }
  233. catch (Exception ex)
  234. {
  235. if (errors == null)
  236. {
  237. errors = new List<Exception>();
  238. }
  239. errors.Add(ex);
  240. }
  241. }
  242. // NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
  243. // instead of the original exception that may have led to running the finally block. This is similar
  244. // to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
  245. // concurrent sequences being merged).
  246. if (errors != null)
  247. {
  248. throw new AggregateException(errors);
  249. }
  250. }
  251. }
  252. #endif
  253. }
  254. /// <summary>
  255. /// Merges elements from all async-enumerable sequences in the given enumerable sequence into a single async-enumerable sequence.
  256. /// </summary>
  257. /// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
  258. /// <param name="sources">Enumerable sequence of async-enumerable sequences.</param>
  259. /// <returns>The async-enumerable sequence that merges the elements of the async-enumerable sequences.</returns>
  260. /// <exception cref="ArgumentNullException"><paramref name="sources"/> is null.</exception>
  261. public static IAsyncEnumerable<TSource> Merge<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  262. {
  263. if (sources == null)
  264. throw Error.ArgumentNull(nameof(sources));
  265. //
  266. // REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
  267. // avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation. It is
  268. // unfortunate though that the Merge overload accepting an array has always been concurrent, so we can't
  269. // change that either (in order to have consistency where Merge is non-concurrent, and ConcurrentMerge
  270. // is). We could consider a breaking change to Ix Async to streamline this, but we should do so when
  271. // shipping with the BCL interfaces (which is already a breaking change to existing Ix Async users). If
  272. // we go that route, we can either have:
  273. //
  274. // - All overloads of Merge are concurrent
  275. // - and continue to be named Merge, or,
  276. // - are renamed to ConcurrentMerge for clarity (likely alongside a ConcurrentZip).
  277. // - All overloads of Merge are non-concurrent
  278. // - and are simply SelectMany operator macros (maybe more optimized)
  279. // - Have ConcurrentMerge next to Merge overloads
  280. // - where ConcurrentMerge may need a degree of concurrency parameter (and maybe other options), and,
  281. // - where the overload set of both families may be asymmetric
  282. //
  283. return sources.ToAsyncEnumerable().SelectMany(source => source);
  284. }
  285. /// <summary>
  286. /// Merges elements from all inner async-enumerable sequences into a single async-enumerable sequence.
  287. /// </summary>
  288. /// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
  289. /// <param name="sources">Observable sequence of inner async-enumerable sequences.</param>
  290. /// <returns>The async-enumerable sequence that merges the elements of the inner sequences.</returns>
  291. /// <exception cref="ArgumentNullException"><paramref name="sources"/> is null.</exception>
  292. public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
  293. {
  294. if (sources == null)
  295. throw Error.ArgumentNull(nameof(sources));
  296. //
  297. // REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
  298. // avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation.
  299. //
  300. return sources.SelectMany(source => source);
  301. }
  302. }
  303. }