Merge.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerableEx
  10. {
  11. public static IAsyncEnumerable<TSource> Merge<TSource>(params IAsyncEnumerable<TSource>[] sources)
  12. {
  13. if (sources == null)
  14. throw Error.ArgumentNull(nameof(sources));
  15. #if USE_FAIR_AND_CHEAPER_MERGE
  16. //
  17. // This new implementation of Merge differs from the original one in a few ways:
  18. //
  19. // - It's cheaper because:
  20. // - no conversion from ValueTask<bool> to Task<bool> takes place using AsTask,
  21. // - we don't instantiate Task.WhenAny tasks for each iteration.
  22. // - It's fairer because:
  23. // - the MoveNextAsync tasks are awaited concurently, but completions are queued,
  24. // instead of awaiting a new WhenAny task where "left" sources have preferential
  25. // treatment over "right" sources.
  26. //
  27. return AsyncEnumerable.Create(Core);
  28. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  29. {
  30. var count = sources.Length;
  31. var enumerators = new IAsyncEnumerator<TSource>[count];
  32. var moveNextTasks = new ValueTask<bool>[count];
  33. try
  34. {
  35. for (var i = 0; i < count; i++)
  36. {
  37. IAsyncEnumerator<TSource> enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
  38. enumerators[i] = enumerator;
  39. // REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
  40. // operations immediately. An alternative would be to do this in a separate stage, thus
  41. // preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
  42. // any MoveNextAsync calls before all enumerators are acquired (or an exception has
  43. // occurred doing so).
  44. moveNextTasks[i] = enumerator.MoveNextAsync();
  45. }
  46. var whenAny = TaskExt.WhenAny(moveNextTasks);
  47. int active = count;
  48. while (active > 0)
  49. {
  50. int index = await whenAny;
  51. IAsyncEnumerator<TSource> enumerator = enumerators[index];
  52. ValueTask<bool> moveNextTask = moveNextTasks[index];
  53. if (!await moveNextTask.ConfigureAwait(false))
  54. {
  55. //
  56. // Replace the task in our array by a completed task to make finally logic easier. Note that
  57. // the WhenAnyValueTask object has a reference to our array (i.e. no copy is made), so this
  58. // gets rid of any resources the original task may have held onto. However, we *don't* call
  59. // whenAny.Replace to set this value, because it'd attach an awaiter to the already completed
  60. // task, causing spurious wake-ups when awaiting whenAny.
  61. //
  62. moveNextTasks[index] = new ValueTask<bool>();
  63. // REVIEW: The original implementation did not dispose eagerly, which could lead to resource
  64. // leaks when merged with other long-running sequences.
  65. enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
  66. await enumerator.DisposeAsync().ConfigureAwait(false);
  67. active--;
  68. }
  69. else
  70. {
  71. TSource item = enumerator.Current;
  72. //
  73. // Replace the task using whenAny.Replace, which will write it to the moveNextTasks array, and
  74. // will start awaiting the task. Note we don't have to write to moveNextTasks ourselves because
  75. // the whenAny object has a reference to it (i.e. no copy is made).
  76. //
  77. whenAny.Replace(index, enumerator.MoveNextAsync());
  78. yield return item;
  79. }
  80. }
  81. }
  82. finally
  83. {
  84. // REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
  85. // additional uncontrollable source of concurrency and the sequential resource acquisition. In
  86. // this modern implementation, we release resources in opposite order as we acquired them, thus
  87. // guaranteeing determinism (and mimicking a series of nested `await using` statements).
  88. // REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
  89. // the operator implementation, we should make this symmetric and first await all in flight
  90. // MoveNextAsync operations, prior to disposing the enumerators.
  91. var errors = default(List<Exception>);
  92. for (var i = count - 1; i >= 0; i--)
  93. {
  94. ValueTask<bool> moveNextTask = moveNextTasks[i];
  95. IAsyncEnumerator<TSource> enumerator = enumerators[i];
  96. try
  97. {
  98. try
  99. {
  100. //
  101. // Await the task to ensure outstanding work is completed prior to performing a dispose
  102. // operation. Note that we don't have to do anything special for tasks belonging to
  103. // enumerators that have finished; we swapped in a placeholder completed task.
  104. //
  105. // REVIEW: This adds an additional continuation to all of the pending tasks (note that
  106. // whenAny also has registered one). The whenAny object will be collectible
  107. // after all of these complete. Alternatively, we could drain via whenAny, by
  108. // awaiting it until the active count drops to 0. This saves on attaching the
  109. // additional continuations, but we need to decide on order of dispose. Right
  110. // now, we dispose in opposite order of acquiring the enumerators, with the
  111. // exception of enumerators that were disposed eagerly upon early completion.
  112. // Should we care about the dispose order at all?
  113. _ = await moveNextTask.ConfigureAwait(false);
  114. }
  115. finally
  116. {
  117. if (enumerator != null)
  118. {
  119. await enumerator.DisposeAsync().ConfigureAwait(false);
  120. }
  121. }
  122. }
  123. catch (Exception ex)
  124. {
  125. if (errors == null)
  126. {
  127. errors = new List<Exception>();
  128. }
  129. errors.Add(ex);
  130. }
  131. }
  132. // NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
  133. // instead of the original exception that may have led to running the finally block. This is similar
  134. // to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
  135. // concurrent sequences being merged).
  136. if (errors != null)
  137. {
  138. throw new AggregateException(errors);
  139. }
  140. }
  141. }
  142. #else
  143. return AsyncEnumerable.Create(Core);
  144. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  145. {
  146. var count = sources.Length;
  147. var enumerators = new IAsyncEnumerator<TSource>[count];
  148. var moveNextTasks = new Task<bool>[count];
  149. try
  150. {
  151. for (var i = 0; i < count; i++)
  152. {
  153. var enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
  154. enumerators[i] = enumerator;
  155. // REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
  156. // operations immediately. An alternative would be to do this in a separate stage, thus
  157. // preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
  158. // any MoveNextAsync calls before all enumerators are acquired (or an exception has
  159. // occurred doing so).
  160. moveNextTasks[i] = enumerator.MoveNextAsync().AsTask();
  161. }
  162. var active = count;
  163. while (active > 0)
  164. {
  165. // REVIEW: Performance of WhenAny may be an issue when called repeatedly like this. We should
  166. // measure and could consider operating directly on the ValueTask<bool> objects, thus
  167. // also preventing the Task<bool> allocations from AsTask.
  168. var moveNextTask = await Task.WhenAny(moveNextTasks).ConfigureAwait(false);
  169. // REVIEW: This seems wrong. AsTask can return the original Task<bool> (if the ValueTask<bool>
  170. // is wrapping one) or return a singleton instance for true and false, at which point
  171. // the use of IndexOf may pick an element closer to the start of the array because of
  172. // reference equality checks and aliasing effects. See GetTaskForResult in the BCL.
  173. var index = Array.IndexOf(moveNextTasks, moveNextTask);
  174. var enumerator = enumerators[index];
  175. if (!await moveNextTask.ConfigureAwait(false))
  176. {
  177. moveNextTasks[index] = TaskExt.Never;
  178. // REVIEW: The original implementation did not dispose eagerly, which could lead to resource
  179. // leaks when merged with other long-running sequences.
  180. enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
  181. await enumerator.DisposeAsync().ConfigureAwait(false);
  182. active--;
  183. }
  184. else
  185. {
  186. var item = enumerator.Current;
  187. moveNextTasks[index] = enumerator.MoveNextAsync().AsTask();
  188. yield return item;
  189. }
  190. }
  191. }
  192. finally
  193. {
  194. // REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
  195. // additional uncontrollable source of concurrency and the sequential resource acquisition. In
  196. // this modern implementation, we release resources in opposite order as we acquired them, thus
  197. // guaranteeing determinism (and mimicking a series of nested `await using` statements).
  198. // REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
  199. // the operator implementation, we should make this symmetric and first await all in flight
  200. // MoveNextAsync operations, prior to disposing the enumerators.
  201. var errors = default(List<Exception>);
  202. for (var i = count - 1; i >= 0; i--)
  203. {
  204. var moveNextTask = moveNextTasks[i];
  205. var enumerator = enumerators[i];
  206. try
  207. {
  208. try
  209. {
  210. if (moveNextTask != null && moveNextTask != TaskExt.Never)
  211. {
  212. _ = await moveNextTask.ConfigureAwait(false);
  213. }
  214. }
  215. finally
  216. {
  217. if (enumerator != null)
  218. {
  219. await enumerator.DisposeAsync().ConfigureAwait(false);
  220. }
  221. }
  222. }
  223. catch (Exception ex)
  224. {
  225. if (errors == null)
  226. {
  227. errors = new List<Exception>();
  228. }
  229. errors.Add(ex);
  230. }
  231. }
  232. // NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
  233. // instead of the original exception that may have led to running the finally block. This is similar
  234. // to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
  235. // concurrent sequences being merged).
  236. if (errors != null)
  237. {
  238. throw new AggregateException(errors);
  239. }
  240. }
  241. }
  242. #endif
  243. }
  244. public static IAsyncEnumerable<TSource> Merge<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  245. {
  246. if (sources == null)
  247. throw Error.ArgumentNull(nameof(sources));
  248. //
  249. // REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
  250. // avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation. It is
  251. // unfortunate though that the Merge overload accepting an array has always been concurrent, so we can't
  252. // change that either (in order to have consistency where Merge is non-concurrent, and ConcurrentMerge
  253. // is). We could consider a breaking change to Ix Async to streamline this, but we should do so when
  254. // shipping with the BCL interfaces (which is already a breaking change to existing Ix Async users). If
  255. // we go that route, we can either have:
  256. //
  257. // - All overloads of Merge are concurrent
  258. // - and continue to be named Merge, or,
  259. // - are renamed to ConcurrentMerge for clarity (likely alongside a ConcurrentZip).
  260. // - All overloads of Merge are non-concurrent
  261. // - and are simply SelectMany operator macros (maybe more optimized)
  262. // - Have ConcurrentMerge next to Merge overloads
  263. // - where ConcurrentMerge may need a degree of concurrency parameter (and maybe other options), and,
  264. // - where the overload set of both families may be asymmetric
  265. //
  266. return sources.ToAsyncEnumerable().SelectMany(source => source);
  267. }
  268. public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
  269. {
  270. if (sources == null)
  271. throw Error.ArgumentNull(nameof(sources));
  272. //
  273. // REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
  274. // avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation.
  275. //
  276. return sources.SelectMany(source => source);
  277. }
  278. }
  279. }