SelectMany.cs 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  12. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.selectmany?view=net-9.0-pp#system-linq-asyncenumerable-selectmany-3(system-collections-generic-iasyncenumerable((-0))-system-func((-0-system-collections-generic-iasyncenumerable((-1))))-system-func((-0-1-2)))
  13. /// <summary>
  14. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence and merges the resulting async-enumerable sequences into one async-enumerable sequence.
  15. /// </summary>
  16. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  17. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  18. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  19. /// <param name="selector">A transform function to apply to each element.</param>
  20. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  21. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  22. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  23. {
  24. if (source == null)
  25. throw Error.ArgumentNull(nameof(source));
  26. if (selector == null)
  27. throw Error.ArgumentNull(nameof(selector));
  28. return new SelectManyAsyncIterator<TSource, TResult>(source, selector);
  29. }
  30. #endif // INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  31. // REVIEW: Should we keep these overloads that return ValueTask<IAsyncEnumerable<TResult>>? One could argue the selector is async twice.
  32. /// <summary>
  33. /// Projects each element of an async-enumerable sequence into an async-enumerable sequence and merges the resulting async-enumerable sequences into one async-enumerable sequence.
  34. /// </summary>
  35. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  36. /// <typeparam name="TResult">The type of elements in the projected inner sequences and the merged result sequence.</typeparam>
  37. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  38. /// <param name="selector">An asynchronous selector function to apply to each element of the source sequence.</param>
  39. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function on each element of the source sequence and awaiting the result.</returns>
  40. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  41. [GenerateAsyncOverload]
  42. [Obsolete("Use SelectMany. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the SelectManyAwait functionality now exists as overloads of SelectMany. You will need to modify your callback to take an additional CancellationToken argument.")]
  43. private static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  44. {
  45. if (source == null)
  46. throw Error.ArgumentNull(nameof(source));
  47. if (selector == null)
  48. throw Error.ArgumentNull(nameof(selector));
  49. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(source, selector);
  50. }
  51. #if !NO_DEEP_CANCELLATION
  52. [GenerateAsyncOverload]
  53. [Obsolete("Use SelectMany. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the SelectManyAwaitWithCancellation functionality now exists as overloads of SelectMany.")]
  54. private static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  55. {
  56. if (source == null)
  57. throw Error.ArgumentNull(nameof(source));
  58. if (selector == null)
  59. throw Error.ArgumentNull(nameof(selector));
  60. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector);
  61. }
  62. #endif
  63. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  64. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.selectmany?view=net-9.0-pp#system-linq-asyncenumerable-selectmany-2(system-collections-generic-iasyncenumerable((-0))-system-func((-0-system-int32-system-collections-generic-iasyncenumerable((-1)))))
  65. /// <summary>
  66. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence by incorporating the element's index and merges the resulting async-enumerable sequences into one async-enumerable sequence.
  67. /// </summary>
  68. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  69. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  70. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  71. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  72. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  73. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  74. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  75. {
  76. if (source == null)
  77. throw Error.ArgumentNull(nameof(source));
  78. if (selector == null)
  79. throw Error.ArgumentNull(nameof(selector));
  80. return Core(source, selector);
  81. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  82. {
  83. var index = -1;
  84. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  85. {
  86. checked
  87. {
  88. index++;
  89. }
  90. var inner = selector(element, index);
  91. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  92. {
  93. yield return subElement;
  94. }
  95. }
  96. }
  97. }
  98. #endif // INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  99. /// <summary>
  100. /// Projects each element of an async-enumerable sequence into an async-enumerable sequence by incorporating the element's index and merges the resulting async-enumerable sequences into an async-enumerable sequence.
  101. /// </summary>
  102. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  103. /// <typeparam name="TResult">The type of elements in the projected inner sequences and the merged result sequence.</typeparam>
  104. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  105. /// <param name="selector">An asynchronous selector function to apply to each element; the second parameter represents the index of the element.</param>
  106. /// <returns>An async-enumerable sequence who's elements are the result of invoking the one-to-many transform function on each element of the source sequence and awaiting the result.</returns>
  107. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  108. [GenerateAsyncOverload]
  109. [Obsolete("Use SelectMany. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the SelectManyAwait functionality now exists as overloads of SelectMany. You will need to modify your callback to take an additional CancellationToken argument.")]
  110. private static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> selector)
  111. {
  112. if (source == null)
  113. throw Error.ArgumentNull(nameof(source));
  114. if (selector == null)
  115. throw Error.ArgumentNull(nameof(selector));
  116. return Core(source, selector);
  117. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  118. {
  119. var index = -1;
  120. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  121. {
  122. checked
  123. {
  124. index++;
  125. }
  126. var inner = await selector(element, index).ConfigureAwait(false);
  127. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  128. {
  129. yield return subElement;
  130. }
  131. }
  132. }
  133. }
  134. #if !NO_DEEP_CANCELLATION
  135. [GenerateAsyncOverload]
  136. [Obsolete("Use SelectMany. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the SelectManyAwaitWithCancellation functionality now exists as overloads of SelectMany.")]
  137. private static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  138. {
  139. if (source == null)
  140. throw Error.ArgumentNull(nameof(source));
  141. if (selector == null)
  142. throw Error.ArgumentNull(nameof(selector));
  143. return Core(source, selector);
  144. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  145. {
  146. var index = -1;
  147. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  148. {
  149. checked
  150. {
  151. index++;
  152. }
  153. var inner = await selector(element, index, cancellationToken).ConfigureAwait(false);
  154. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  155. {
  156. yield return subElement;
  157. }
  158. }
  159. }
  160. }
  161. #endif
  162. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  163. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.selectmany?view=net-9.0-pp#system-linq-asyncenumerable-selectmany-3(system-collections-generic-iasyncenumerable((-0))-system-func((-0-system-collections-generic-iasyncenumerable((-1))))-system-func((-0-1-2)))
  164. /// <summary>
  165. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one async-enumerable sequence.
  166. /// </summary>
  167. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  168. /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
  169. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  170. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  171. /// <param name="collectionSelector">A transform function to apply to each element.</param>
  172. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  173. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  174. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  175. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  176. {
  177. if (source == null)
  178. throw Error.ArgumentNull(nameof(source));
  179. if (collectionSelector == null)
  180. throw Error.ArgumentNull(nameof(collectionSelector));
  181. if (resultSelector == null)
  182. throw Error.ArgumentNull(nameof(resultSelector));
  183. return Core(source, collectionSelector, resultSelector);
  184. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  185. {
  186. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  187. {
  188. var inner = collectionSelector(element);
  189. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  190. {
  191. yield return resultSelector(element, subElement);
  192. }
  193. }
  194. }
  195. }
  196. #endif // INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  197. /// <summary>
  198. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence by awaiting the result of a transform function, invokes the result selector for each of the source elements and each of the corrasponding inner-sequence's elements and awaits the result, and merges the results into one async-enumerable sequence.
  199. /// </summary>
  200. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  201. /// <typeparam name="TCollection">The type of elements in the projected intermediate sequences.</typeparam>
  202. /// <typeparam name="TResult">The type of elements in the result sequence.</typeparam>
  203. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  204. /// <param name="collectionSelector">An asynchronous transform function to apply to each source element.</param>
  205. /// <param name="resultSelector">An asynchronous transform function to apply to each element of the intermediate sequence.</param>
  206. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function <paramref name="collectionSelector"/> on each element of the input sequence, awaiting the result, applying <paramref name="resultSelector"/> to each element of the intermediate sequences along with their corrasponding source element and awaiting the result.</returns>
  207. /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="collectionSelector"/>, or <paramref name="resultSelector"/> is <see langword="null"/>.</exception>
  208. [GenerateAsyncOverload]
  209. [Obsolete("Use SelectMany. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the SelectManyAwait functionality now exists as overloads of SelectMany.")]
  210. private static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  211. {
  212. if (source == null)
  213. throw Error.ArgumentNull(nameof(source));
  214. if (collectionSelector == null)
  215. throw Error.ArgumentNull(nameof(collectionSelector));
  216. if (resultSelector == null)
  217. throw Error.ArgumentNull(nameof(resultSelector));
  218. return Core(source, collectionSelector, resultSelector);
  219. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  220. {
  221. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  222. {
  223. var inner = await collectionSelector(element).ConfigureAwait(false);
  224. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  225. {
  226. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  227. }
  228. }
  229. }
  230. }
  231. #if !NO_DEEP_CANCELLATION
  232. [GenerateAsyncOverload]
  233. [Obsolete("Use SelectMany. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the SelectManyAwaitWithCancellation functionality now exists as overloads of SelectMany.")]
  234. private static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  235. {
  236. if (source == null)
  237. throw Error.ArgumentNull(nameof(source));
  238. if (collectionSelector == null)
  239. throw Error.ArgumentNull(nameof(collectionSelector));
  240. if (resultSelector == null)
  241. throw Error.ArgumentNull(nameof(resultSelector));
  242. return Core(source, collectionSelector, resultSelector);
  243. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  244. {
  245. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  246. {
  247. var inner = await collectionSelector(element, cancellationToken).ConfigureAwait(false);
  248. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  249. {
  250. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  251. }
  252. }
  253. }
  254. }
  255. #endif
  256. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  257. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.selectmany?view=net-9.0-pp#system-linq-asyncenumerable-selectmany-3(system-collections-generic-iasyncenumerable((-0))-system-func((-0-system-int32-system-collections-generic-ienumerable((-1))))-system-func((-0-1-2)))
  258. /// <summary>
  259. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one async-enumerable sequence.
  260. /// </summary>
  261. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  262. /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
  263. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  264. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  265. /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  266. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element and the fourth parameter represents the index of the intermediate element.</param>
  267. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  268. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  269. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  270. {
  271. if (source == null)
  272. throw Error.ArgumentNull(nameof(source));
  273. if (collectionSelector == null)
  274. throw Error.ArgumentNull(nameof(collectionSelector));
  275. if (resultSelector == null)
  276. throw Error.ArgumentNull(nameof(resultSelector));
  277. return Core(source, collectionSelector, resultSelector);
  278. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  279. {
  280. var index = -1;
  281. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  282. {
  283. checked
  284. {
  285. index++;
  286. }
  287. var inner = collectionSelector(element, index);
  288. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  289. {
  290. yield return resultSelector(element, subElement);
  291. }
  292. }
  293. }
  294. }
  295. #endif // INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  296. /// <summary>
  297. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence by awaiting the result of a transform function that incorporates each element's index,
  298. /// invokes the result selector for the source element and each of the corrasponding inner-sequence's elements and awaits the result, and merges the results into one async-enumerable sequence.
  299. /// </summary>
  300. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  301. /// <typeparam name="TCollection">The type of elements in the projected intermediate sequences.</typeparam>
  302. /// <typeparam name="TResult">The type of elements in the result sequence.</typeparam>
  303. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  304. /// <param name="collectionSelector">An asynchronous transform function to apply to each source element; the second parameter represents the index of the element.</param>
  305. /// <param name="resultSelector">An asynchronous transform function to apply to each element of the intermediate sequence.</param>
  306. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function <paramref name="collectionSelector"/> on each element of the input sequence, awaiting the result, applying <paramref name="resultSelector"/> to each element of the intermediate sequences olong with their corrasponding source element and awaiting the result.</returns>
  307. /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="collectionSelector"/>, or <paramref name="resultSelector"/> is <see langword="null"/>.</exception>
  308. [GenerateAsyncOverload]
  309. [Obsolete("Use SelectMany. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the SelectManyAwait functionality now exists as overloads of SelectMany. You will need to modify your callback to take an additional CancellationToken argument.")]
  310. private static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  311. {
  312. if (source == null)
  313. throw Error.ArgumentNull(nameof(source));
  314. if (collectionSelector == null)
  315. throw Error.ArgumentNull(nameof(collectionSelector));
  316. if (resultSelector == null)
  317. throw Error.ArgumentNull(nameof(resultSelector));
  318. return Core(source, collectionSelector, resultSelector);
  319. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  320. {
  321. var index = -1;
  322. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  323. {
  324. checked
  325. {
  326. index++;
  327. }
  328. var inner = await collectionSelector(element, index).ConfigureAwait(false);
  329. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  330. {
  331. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  332. }
  333. }
  334. }
  335. }
  336. #if !NO_DEEP_CANCELLATION
  337. [GenerateAsyncOverload]
  338. [Obsolete("Use SelectMany. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the SelectManyAwaitWithCancellation functionality now exists as overloads of SelectMany.")]
  339. private static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  340. {
  341. if (source == null)
  342. throw Error.ArgumentNull(nameof(source));
  343. if (collectionSelector == null)
  344. throw Error.ArgumentNull(nameof(collectionSelector));
  345. if (resultSelector == null)
  346. throw Error.ArgumentNull(nameof(resultSelector));
  347. return Core(source, collectionSelector, resultSelector);
  348. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  349. {
  350. var index = -1;
  351. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  352. {
  353. checked
  354. {
  355. index++;
  356. }
  357. var inner = await collectionSelector(element, index, cancellationToken).ConfigureAwait(false);
  358. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  359. {
  360. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  361. }
  362. }
  363. }
  364. }
  365. #endif
  366. private sealed class SelectManyAsyncIterator<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  367. {
  368. private const int State_Source = 1;
  369. private const int State_Result = 2;
  370. private readonly Func<TSource, IAsyncEnumerable<TResult>> _selector;
  371. private readonly IAsyncEnumerable<TSource> _source;
  372. private int _mode;
  373. private IAsyncEnumerator<TResult>? _resultEnumerator;
  374. private IAsyncEnumerator<TSource>? _sourceEnumerator;
  375. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  376. {
  377. _source = source;
  378. _selector = selector;
  379. }
  380. public override AsyncIteratorBase<TResult> Clone()
  381. {
  382. return new SelectManyAsyncIterator<TSource, TResult>(_source, _selector);
  383. }
  384. public override async ValueTask DisposeAsync()
  385. {
  386. if (_resultEnumerator != null)
  387. {
  388. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  389. _resultEnumerator = null;
  390. }
  391. if (_sourceEnumerator != null)
  392. {
  393. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  394. _sourceEnumerator = null;
  395. }
  396. await base.DisposeAsync().ConfigureAwait(false);
  397. }
  398. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  399. {
  400. if (onlyIfCheap)
  401. {
  402. return new ValueTask<int>(-1);
  403. }
  404. return Core(cancellationToken);
  405. async ValueTask<int> Core(CancellationToken cancellationToken)
  406. {
  407. var count = 0;
  408. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  409. {
  410. checked
  411. {
  412. count += await _selector(element).CountAsync().ConfigureAwait(false);
  413. }
  414. }
  415. return count;
  416. }
  417. }
  418. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  419. {
  420. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  421. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  422. return list.ToArray();
  423. }
  424. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  425. {
  426. var list = new List<TResult>();
  427. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  428. {
  429. var items = _selector(element);
  430. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  431. }
  432. return list;
  433. }
  434. protected override async ValueTask<bool> MoveNextCore()
  435. {
  436. switch (_state)
  437. {
  438. case AsyncIteratorState.Allocated:
  439. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  440. _mode = State_Source;
  441. _state = AsyncIteratorState.Iterating;
  442. goto case AsyncIteratorState.Iterating;
  443. case AsyncIteratorState.Iterating:
  444. switch (_mode)
  445. {
  446. case State_Source:
  447. if (await _sourceEnumerator!.MoveNextAsync().ConfigureAwait(false))
  448. {
  449. if (_resultEnumerator != null)
  450. {
  451. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  452. }
  453. var inner = _selector(_sourceEnumerator.Current);
  454. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  455. _mode = State_Result;
  456. goto case State_Result;
  457. }
  458. break;
  459. case State_Result:
  460. if (await _resultEnumerator!.MoveNextAsync().ConfigureAwait(false))
  461. {
  462. _current = _resultEnumerator.Current;
  463. return true;
  464. }
  465. _mode = State_Source;
  466. goto case State_Source; // loop
  467. }
  468. break;
  469. }
  470. await DisposeAsync().ConfigureAwait(false);
  471. return false;
  472. }
  473. }
  474. private sealed class SelectManyAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  475. {
  476. private const int State_Source = 1;
  477. private const int State_Result = 2;
  478. private readonly Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  479. private readonly IAsyncEnumerable<TSource> _source;
  480. private int _mode;
  481. private IAsyncEnumerator<TResult>? _resultEnumerator;
  482. private IAsyncEnumerator<TSource>? _sourceEnumerator;
  483. public SelectManyAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  484. {
  485. _source = source;
  486. _selector = selector;
  487. }
  488. public override AsyncIteratorBase<TResult> Clone()
  489. {
  490. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  491. }
  492. public override async ValueTask DisposeAsync()
  493. {
  494. if (_resultEnumerator != null)
  495. {
  496. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  497. _resultEnumerator = null;
  498. }
  499. if (_sourceEnumerator != null)
  500. {
  501. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  502. _sourceEnumerator = null;
  503. }
  504. await base.DisposeAsync().ConfigureAwait(false);
  505. }
  506. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  507. {
  508. if (onlyIfCheap)
  509. {
  510. return new ValueTask<int>(-1);
  511. }
  512. return Core(cancellationToken);
  513. async ValueTask<int> Core(CancellationToken cancellationToken)
  514. {
  515. var count = 0;
  516. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  517. {
  518. var items = await _selector(element).ConfigureAwait(false);
  519. checked
  520. {
  521. count += await items.CountAsync().ConfigureAwait(false);
  522. }
  523. }
  524. return count;
  525. }
  526. }
  527. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  528. {
  529. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  530. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  531. return list.ToArray();
  532. }
  533. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  534. {
  535. var list = new List<TResult>();
  536. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  537. {
  538. var items = await _selector(element).ConfigureAwait(false);
  539. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  540. }
  541. return list;
  542. }
  543. protected override async ValueTask<bool> MoveNextCore()
  544. {
  545. switch (_state)
  546. {
  547. case AsyncIteratorState.Allocated:
  548. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  549. _mode = State_Source;
  550. _state = AsyncIteratorState.Iterating;
  551. goto case AsyncIteratorState.Iterating;
  552. case AsyncIteratorState.Iterating:
  553. switch (_mode)
  554. {
  555. case State_Source:
  556. if (await _sourceEnumerator!.MoveNextAsync().ConfigureAwait(false))
  557. {
  558. if (_resultEnumerator != null)
  559. {
  560. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  561. }
  562. var inner = await _selector(_sourceEnumerator.Current).ConfigureAwait(false);
  563. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  564. _mode = State_Result;
  565. goto case State_Result;
  566. }
  567. break;
  568. case State_Result:
  569. if (await _resultEnumerator!.MoveNextAsync().ConfigureAwait(false))
  570. {
  571. _current = _resultEnumerator.Current;
  572. return true;
  573. }
  574. _mode = State_Source;
  575. goto case State_Source; // loop
  576. }
  577. break;
  578. }
  579. await DisposeAsync().ConfigureAwait(false);
  580. return false;
  581. }
  582. }
  583. #if !NO_DEEP_CANCELLATION
  584. private sealed class SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  585. {
  586. private const int State_Source = 1;
  587. private const int State_Result = 2;
  588. private readonly Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  589. private readonly IAsyncEnumerable<TSource> _source;
  590. private int _mode;
  591. private IAsyncEnumerator<TResult>? _resultEnumerator;
  592. private IAsyncEnumerator<TSource>? _sourceEnumerator;
  593. public SelectManyAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  594. {
  595. _source = source;
  596. _selector = selector;
  597. }
  598. public override AsyncIteratorBase<TResult> Clone()
  599. {
  600. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  601. }
  602. public override async ValueTask DisposeAsync()
  603. {
  604. if (_resultEnumerator != null)
  605. {
  606. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  607. _resultEnumerator = null;
  608. }
  609. if (_sourceEnumerator != null)
  610. {
  611. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  612. _sourceEnumerator = null;
  613. }
  614. await base.DisposeAsync().ConfigureAwait(false);
  615. }
  616. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  617. {
  618. if (onlyIfCheap)
  619. {
  620. return new ValueTask<int>(-1);
  621. }
  622. return Core(cancellationToken);
  623. async ValueTask<int> Core(CancellationToken cancellationToken)
  624. {
  625. var count = 0;
  626. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  627. {
  628. var items = await _selector(element, cancellationToken).ConfigureAwait(false);
  629. checked
  630. {
  631. count += await items.CountAsync().ConfigureAwait(false);
  632. }
  633. }
  634. return count;
  635. }
  636. }
  637. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  638. {
  639. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  640. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  641. return list.ToArray();
  642. }
  643. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  644. {
  645. var list = new List<TResult>();
  646. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  647. {
  648. var items = await _selector(element, cancellationToken).ConfigureAwait(false);
  649. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  650. }
  651. return list;
  652. }
  653. protected override async ValueTask<bool> MoveNextCore()
  654. {
  655. switch (_state)
  656. {
  657. case AsyncIteratorState.Allocated:
  658. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  659. _mode = State_Source;
  660. _state = AsyncIteratorState.Iterating;
  661. goto case AsyncIteratorState.Iterating;
  662. case AsyncIteratorState.Iterating:
  663. switch (_mode)
  664. {
  665. case State_Source:
  666. if (await _sourceEnumerator!.MoveNextAsync().ConfigureAwait(false))
  667. {
  668. if (_resultEnumerator != null)
  669. {
  670. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  671. }
  672. var inner = await _selector(_sourceEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  673. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  674. _mode = State_Result;
  675. goto case State_Result;
  676. }
  677. break;
  678. case State_Result:
  679. if (await _resultEnumerator!.MoveNextAsync().ConfigureAwait(false))
  680. {
  681. _current = _resultEnumerator.Current;
  682. return true;
  683. }
  684. _mode = State_Source;
  685. goto case State_Source; // loop
  686. }
  687. break;
  688. }
  689. await DisposeAsync().ConfigureAwait(false);
  690. return false;
  691. }
  692. }
  693. #endif
  694. }
  695. }