Merge.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerableEx
  11. {
  12. public static IAsyncEnumerable<TSource> Merge<TSource>(params IAsyncEnumerable<TSource>[] sources)
  13. {
  14. if (sources == null)
  15. throw Error.ArgumentNull(nameof(sources));
  16. #if USE_ASYNC_ITERATOR
  17. return AsyncEnumerable.Create(Core);
  18. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  19. {
  20. var count = sources.Length;
  21. var enumerators = new IAsyncEnumerator<TSource>[count];
  22. var moveNextTasks = new Task<bool>[count];
  23. try
  24. {
  25. for (var i = 0; i < count; i++)
  26. {
  27. IAsyncEnumerator<TSource> enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
  28. enumerators[i] = enumerator;
  29. // REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
  30. // operations immediately. An alternative would be to do this in a separate stage, thus
  31. // preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
  32. // any MoveNextAsync calls before all enumerators are acquired (or an exception has
  33. // occurred doing so).
  34. moveNextTasks[i] = enumerator.MoveNextAsync().AsTask();
  35. }
  36. int active = count;
  37. while (active > 0)
  38. {
  39. // REVIEW: Performance of WhenAny may be an issue when called repeatedly like this. We should
  40. // measure and could consider operating directly on the ValueTask<bool> objects, thus
  41. // also preventing the Task<bool> allocations from AsTask.
  42. var moveNextTask = await Task.WhenAny(moveNextTasks).ConfigureAwait(false);
  43. int index = Array.IndexOf(moveNextTasks, moveNextTask);
  44. IAsyncEnumerator<TSource> enumerator = enumerators[index];
  45. if (!await moveNextTask.ConfigureAwait(false))
  46. {
  47. moveNextTasks[index] = TaskExt.Never;
  48. // REVIEW: The original implementation did not dispose eagerly, which could lead to resource
  49. // leaks when merged with other long-running sequences.
  50. enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
  51. await enumerator.DisposeAsync().ConfigureAwait(false);
  52. active--;
  53. }
  54. else
  55. {
  56. TSource item = enumerator.Current;
  57. moveNextTasks[index] = enumerator.MoveNextAsync().AsTask();
  58. yield return item;
  59. }
  60. }
  61. }
  62. finally
  63. {
  64. // REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
  65. // additional uncontrollable source of concurrency and the sequential resource acquisition. In
  66. // this modern implementation, we release resources in opposite order as we acquired them, thus
  67. // guaranteeing determinism (and mimicking a series of nested `await using` statements).
  68. // REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
  69. // the operator implementation, we should make this symmetric and first await all in flight
  70. // MoveNextAsync operations, prior to disposing the enumerators.
  71. var errors = default(List<Exception>);
  72. for (var i = count - 1; i >= 0; i--)
  73. {
  74. Task<bool> moveNextTask = moveNextTasks[i];
  75. IAsyncEnumerator<TSource> enumerator = enumerators[i];
  76. try
  77. {
  78. try
  79. {
  80. if (moveNextTask != null && moveNextTask != TaskExt.Never)
  81. {
  82. _ = await moveNextTask.ConfigureAwait(false);
  83. }
  84. }
  85. finally
  86. {
  87. if (enumerator != null)
  88. {
  89. await enumerator.DisposeAsync().ConfigureAwait(false);
  90. }
  91. }
  92. }
  93. catch (Exception ex)
  94. {
  95. if (errors == null)
  96. {
  97. errors = new List<Exception>();
  98. }
  99. errors.Add(ex);
  100. }
  101. }
  102. // NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
  103. // instead of the original exception that may have led to running the finally block. This is similar
  104. // to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
  105. // concurrent sequences being merged).
  106. if (errors != null)
  107. {
  108. throw new AggregateException(errors);
  109. }
  110. }
  111. }
  112. #else
  113. return new MergeAsyncIterator<TSource>(sources);
  114. #endif
  115. }
  116. public static IAsyncEnumerable<TSource> Merge<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  117. {
  118. if (sources == null)
  119. throw Error.ArgumentNull(nameof(sources));
  120. //
  121. // REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
  122. // avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation. It is
  123. // unfortunate though that the Merge overload accepting an array has always been concurrent, so we can't
  124. // change that either (in order to have consistency where Merge is non-concurrent, and ConcurrentMerge
  125. // is). We could consider a breaking change to Ix Async to streamline this, but we should do so when
  126. // shipping with the BCL interfaces (which is already a breaking change to existing Ix Async users). If
  127. // we go that route, we can either have:
  128. //
  129. // - All overloads of Merge are concurrent
  130. // - and continue to be named Merge, or,
  131. // - are renamed to ConcurrentMerge for clarity (likely alongside a ConcurrentZip).
  132. // - All overloads of Merge are non-concurrent
  133. // - and are simply SelectMany operator macros (maybe more optimized)
  134. // - Have ConcurrentMerge next to Merge overloads
  135. // - where ConcurrentMerge may need a degree of concurrency parameter (and maybe other options), and,
  136. // - where the overload set of both families may be asymmetric
  137. //
  138. return sources.ToAsyncEnumerable().SelectMany(source => source);
  139. }
  140. public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
  141. {
  142. if (sources == null)
  143. throw Error.ArgumentNull(nameof(sources));
  144. //
  145. // REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
  146. // avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation.
  147. //
  148. return sources.SelectMany(source => source);
  149. }
  150. private sealed class MergeAsyncIterator<TSource> : AsyncIterator<TSource>
  151. {
  152. private readonly IAsyncEnumerable<TSource>[] _sources;
  153. private IAsyncEnumerator<TSource>[] _enumerators;
  154. private Task<bool>[] _moveNexts;
  155. private int _active;
  156. public MergeAsyncIterator(IAsyncEnumerable<TSource>[] sources)
  157. {
  158. Debug.Assert(sources != null);
  159. _sources = sources;
  160. }
  161. public override AsyncIteratorBase<TSource> Clone()
  162. {
  163. return new MergeAsyncIterator<TSource>(_sources);
  164. }
  165. public override async ValueTask DisposeAsync()
  166. {
  167. if (_enumerators != null)
  168. {
  169. var n = _enumerators.Length;
  170. var disposes = new ValueTask[n];
  171. for (var i = 0; i < n; i++)
  172. {
  173. var dispose = _enumerators[i].DisposeAsync();
  174. disposes[i] = dispose;
  175. }
  176. await Task.WhenAll(disposes.Select(t => t.AsTask())).ConfigureAwait(false);
  177. _enumerators = null;
  178. }
  179. await base.DisposeAsync().ConfigureAwait(false);
  180. }
  181. protected override async ValueTask<bool> MoveNextCore()
  182. {
  183. switch (_state)
  184. {
  185. case AsyncIteratorState.Allocated:
  186. var n = _sources.Length;
  187. _enumerators = new IAsyncEnumerator<TSource>[n];
  188. _moveNexts = new Task<bool>[n];
  189. _active = n;
  190. for (var i = 0; i < n; i++)
  191. {
  192. var enumerator = _sources[i].GetAsyncEnumerator(_cancellationToken);
  193. _enumerators[i] = enumerator;
  194. _moveNexts[i] = enumerator.MoveNextAsync().AsTask();
  195. }
  196. _state = AsyncIteratorState.Iterating;
  197. goto case AsyncIteratorState.Iterating;
  198. case AsyncIteratorState.Iterating:
  199. while (_active > 0)
  200. {
  201. //
  202. // REVIEW: This approach does have a bias towards giving sources on the left
  203. // priority over sources on the right when yielding values. We may
  204. // want to consider a "prefer fairness" option.
  205. //
  206. var moveNext = await Task.WhenAny(_moveNexts).ConfigureAwait(false);
  207. var index = Array.IndexOf(_moveNexts, moveNext);
  208. if (!await moveNext.ConfigureAwait(false))
  209. {
  210. _moveNexts[index] = TaskExt.Never;
  211. _active--;
  212. }
  213. else
  214. {
  215. var enumerator = _enumerators[index];
  216. _current = enumerator.Current;
  217. _moveNexts[index] = enumerator.MoveNextAsync().AsTask();
  218. return true;
  219. }
  220. }
  221. break;
  222. }
  223. await DisposeAsync().ConfigureAwait(false);
  224. return false;
  225. }
  226. }
  227. }
  228. }