SelectMany.cs 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. /// <summary>
  12. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence and merges the resulting async-enumerable sequences into one async-enumerable sequence.
  13. /// </summary>
  14. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  15. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  16. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  17. /// <param name="selector">A transform function to apply to each element.</param>
  18. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  19. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  20. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  21. {
  22. if (source == null)
  23. throw Error.ArgumentNull(nameof(source));
  24. if (selector == null)
  25. throw Error.ArgumentNull(nameof(selector));
  26. return new SelectManyAsyncIterator<TSource, TResult>(source, selector);
  27. }
  28. // REVIEW: Should we keep these overloads that return ValueTask<IAsyncEnumerable<TResult>>? One could argue the selector is async twice.
  29. internal static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  30. {
  31. if (source == null)
  32. throw Error.ArgumentNull(nameof(source));
  33. if (selector == null)
  34. throw Error.ArgumentNull(nameof(selector));
  35. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(source, selector);
  36. }
  37. #if !NO_DEEP_CANCELLATION
  38. internal static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  39. {
  40. if (source == null)
  41. throw Error.ArgumentNull(nameof(source));
  42. if (selector == null)
  43. throw Error.ArgumentNull(nameof(selector));
  44. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector);
  45. }
  46. #endif
  47. /// <summary>
  48. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence by incorporating the element's index and merges the resulting async-enumerable sequences into one async-enumerable sequence.
  49. /// </summary>
  50. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  51. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  52. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  53. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  54. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  55. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  56. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  57. {
  58. if (source == null)
  59. throw Error.ArgumentNull(nameof(source));
  60. if (selector == null)
  61. throw Error.ArgumentNull(nameof(selector));
  62. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  63. return Core();
  64. async IAsyncEnumerable<TResult> Core([System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  65. #else
  66. return Create(Core);
  67. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  68. #endif
  69. {
  70. var index = -1;
  71. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  72. {
  73. checked
  74. {
  75. index++;
  76. }
  77. var inner = selector(element, index);
  78. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  79. {
  80. yield return subElement;
  81. }
  82. }
  83. }
  84. }
  85. internal static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> selector)
  86. {
  87. if (source == null)
  88. throw Error.ArgumentNull(nameof(source));
  89. if (selector == null)
  90. throw Error.ArgumentNull(nameof(selector));
  91. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  92. return Core();
  93. async IAsyncEnumerable<TResult> Core([System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  94. #else
  95. return Create(Core);
  96. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  97. #endif
  98. {
  99. var index = -1;
  100. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  101. {
  102. checked
  103. {
  104. index++;
  105. }
  106. var inner = await selector(element, index).ConfigureAwait(false);
  107. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  108. {
  109. yield return subElement;
  110. }
  111. }
  112. }
  113. }
  114. #if !NO_DEEP_CANCELLATION
  115. internal static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  116. {
  117. if (source == null)
  118. throw Error.ArgumentNull(nameof(source));
  119. if (selector == null)
  120. throw Error.ArgumentNull(nameof(selector));
  121. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  122. return Core();
  123. async IAsyncEnumerable<TResult> Core([System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  124. #else
  125. return Create(Core);
  126. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  127. #endif
  128. {
  129. var index = -1;
  130. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  131. {
  132. checked
  133. {
  134. index++;
  135. }
  136. var inner = await selector(element, index, cancellationToken).ConfigureAwait(false);
  137. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  138. {
  139. yield return subElement;
  140. }
  141. }
  142. }
  143. }
  144. #endif
  145. /// <summary>
  146. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one async-enumerable sequence.
  147. /// </summary>
  148. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  149. /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
  150. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  151. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  152. /// <param name="collectionSelector">A transform function to apply to each element.</param>
  153. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  154. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  155. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  156. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  157. {
  158. if (source == null)
  159. throw Error.ArgumentNull(nameof(source));
  160. if (collectionSelector == null)
  161. throw Error.ArgumentNull(nameof(collectionSelector));
  162. if (resultSelector == null)
  163. throw Error.ArgumentNull(nameof(resultSelector));
  164. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  165. return Core();
  166. async IAsyncEnumerable<TResult> Core([System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  167. #else
  168. return Create(Core);
  169. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  170. #endif
  171. {
  172. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  173. {
  174. var inner = collectionSelector(element);
  175. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  176. {
  177. yield return resultSelector(element, subElement);
  178. }
  179. }
  180. }
  181. }
  182. internal static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  183. {
  184. if (source == null)
  185. throw Error.ArgumentNull(nameof(source));
  186. if (collectionSelector == null)
  187. throw Error.ArgumentNull(nameof(collectionSelector));
  188. if (resultSelector == null)
  189. throw Error.ArgumentNull(nameof(resultSelector));
  190. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  191. return Core();
  192. async IAsyncEnumerable<TResult> Core([System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  193. #else
  194. return Create(Core);
  195. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  196. #endif
  197. {
  198. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  199. {
  200. var inner = await collectionSelector(element).ConfigureAwait(false);
  201. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  202. {
  203. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  204. }
  205. }
  206. }
  207. }
  208. #if !NO_DEEP_CANCELLATION
  209. internal static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  210. {
  211. if (source == null)
  212. throw Error.ArgumentNull(nameof(source));
  213. if (collectionSelector == null)
  214. throw Error.ArgumentNull(nameof(collectionSelector));
  215. if (resultSelector == null)
  216. throw Error.ArgumentNull(nameof(resultSelector));
  217. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  218. return Core();
  219. async IAsyncEnumerable<TResult> Core([System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  220. #else
  221. return Create(Core);
  222. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  223. #endif
  224. {
  225. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  226. {
  227. var inner = await collectionSelector(element, cancellationToken).ConfigureAwait(false);
  228. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  229. {
  230. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  231. }
  232. }
  233. }
  234. }
  235. #endif
  236. /// <summary>
  237. /// Projects each element of an async-enumerable sequence to an async-enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one async-enumerable sequence.
  238. /// </summary>
  239. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  240. /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
  241. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  242. /// <param name="source">An async-enumerable sequence of elements to project.</param>
  243. /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  244. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element and the fourth parameter represents the index of the intermediate element.</param>
  245. /// <returns>An async-enumerable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  246. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  247. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  248. {
  249. if (source == null)
  250. throw Error.ArgumentNull(nameof(source));
  251. if (collectionSelector == null)
  252. throw Error.ArgumentNull(nameof(collectionSelector));
  253. if (resultSelector == null)
  254. throw Error.ArgumentNull(nameof(resultSelector));
  255. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  256. return Core();
  257. async IAsyncEnumerable<TResult> Core([System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  258. #else
  259. return Create(Core);
  260. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  261. #endif
  262. {
  263. var index = -1;
  264. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  265. {
  266. checked
  267. {
  268. index++;
  269. }
  270. var inner = collectionSelector(element, index);
  271. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  272. {
  273. yield return resultSelector(element, subElement);
  274. }
  275. }
  276. }
  277. }
  278. internal static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  279. {
  280. if (source == null)
  281. throw Error.ArgumentNull(nameof(source));
  282. if (collectionSelector == null)
  283. throw Error.ArgumentNull(nameof(collectionSelector));
  284. if (resultSelector == null)
  285. throw Error.ArgumentNull(nameof(resultSelector));
  286. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  287. return Core();
  288. async IAsyncEnumerable<TResult> Core([System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  289. #else
  290. return Create(Core);
  291. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  292. #endif
  293. {
  294. var index = -1;
  295. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  296. {
  297. checked
  298. {
  299. index++;
  300. }
  301. var inner = await collectionSelector(element, index).ConfigureAwait(false);
  302. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  303. {
  304. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  305. }
  306. }
  307. }
  308. }
  309. #if !NO_DEEP_CANCELLATION
  310. internal static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  311. {
  312. if (source == null)
  313. throw Error.ArgumentNull(nameof(source));
  314. if (collectionSelector == null)
  315. throw Error.ArgumentNull(nameof(collectionSelector));
  316. if (resultSelector == null)
  317. throw Error.ArgumentNull(nameof(resultSelector));
  318. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  319. return Core();
  320. async IAsyncEnumerable<TResult> Core([System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  321. #else
  322. return Create(Core);
  323. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  324. #endif
  325. {
  326. var index = -1;
  327. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  328. {
  329. checked
  330. {
  331. index++;
  332. }
  333. var inner = await collectionSelector(element, index, cancellationToken).ConfigureAwait(false);
  334. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  335. {
  336. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  337. }
  338. }
  339. }
  340. }
  341. #endif
  342. private sealed class SelectManyAsyncIterator<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  343. {
  344. private const int State_Source = 1;
  345. private const int State_Result = 2;
  346. private readonly Func<TSource, IAsyncEnumerable<TResult>> _selector;
  347. private readonly IAsyncEnumerable<TSource> _source;
  348. private int _mode;
  349. private IAsyncEnumerator<TResult>? _resultEnumerator;
  350. private IAsyncEnumerator<TSource>? _sourceEnumerator;
  351. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  352. {
  353. _source = source;
  354. _selector = selector;
  355. }
  356. public override AsyncIteratorBase<TResult> Clone()
  357. {
  358. return new SelectManyAsyncIterator<TSource, TResult>(_source, _selector);
  359. }
  360. public override async ValueTask DisposeAsync()
  361. {
  362. if (_resultEnumerator != null)
  363. {
  364. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  365. _resultEnumerator = null;
  366. }
  367. if (_sourceEnumerator != null)
  368. {
  369. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  370. _sourceEnumerator = null;
  371. }
  372. await base.DisposeAsync().ConfigureAwait(false);
  373. }
  374. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  375. {
  376. if (onlyIfCheap)
  377. {
  378. return new ValueTask<int>(-1);
  379. }
  380. return Core(cancellationToken);
  381. async ValueTask<int> Core(CancellationToken cancellationToken)
  382. {
  383. var count = 0;
  384. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  385. {
  386. checked
  387. {
  388. count += await _selector(element).CountAsync().ConfigureAwait(false);
  389. }
  390. }
  391. return count;
  392. }
  393. }
  394. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  395. {
  396. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  397. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  398. return list.ToArray();
  399. }
  400. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  401. {
  402. var list = new List<TResult>();
  403. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  404. {
  405. var items = _selector(element);
  406. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  407. }
  408. return list;
  409. }
  410. protected override async ValueTask<bool> MoveNextCore()
  411. {
  412. switch (_state)
  413. {
  414. case AsyncIteratorState.Allocated:
  415. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  416. _mode = State_Source;
  417. _state = AsyncIteratorState.Iterating;
  418. goto case AsyncIteratorState.Iterating;
  419. case AsyncIteratorState.Iterating:
  420. switch (_mode)
  421. {
  422. case State_Source:
  423. if (await _sourceEnumerator!.MoveNextAsync().ConfigureAwait(false))
  424. {
  425. if (_resultEnumerator != null)
  426. {
  427. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  428. }
  429. var inner = _selector(_sourceEnumerator.Current);
  430. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  431. _mode = State_Result;
  432. goto case State_Result;
  433. }
  434. break;
  435. case State_Result:
  436. if (await _resultEnumerator!.MoveNextAsync().ConfigureAwait(false))
  437. {
  438. _current = _resultEnumerator.Current;
  439. return true;
  440. }
  441. _mode = State_Source;
  442. goto case State_Source; // loop
  443. }
  444. break;
  445. }
  446. await DisposeAsync().ConfigureAwait(false);
  447. return false;
  448. }
  449. }
  450. private sealed class SelectManyAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  451. {
  452. private const int State_Source = 1;
  453. private const int State_Result = 2;
  454. private readonly Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  455. private readonly IAsyncEnumerable<TSource> _source;
  456. private int _mode;
  457. private IAsyncEnumerator<TResult>? _resultEnumerator;
  458. private IAsyncEnumerator<TSource>? _sourceEnumerator;
  459. public SelectManyAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  460. {
  461. _source = source;
  462. _selector = selector;
  463. }
  464. public override AsyncIteratorBase<TResult> Clone()
  465. {
  466. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  467. }
  468. public override async ValueTask DisposeAsync()
  469. {
  470. if (_resultEnumerator != null)
  471. {
  472. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  473. _resultEnumerator = null;
  474. }
  475. if (_sourceEnumerator != null)
  476. {
  477. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  478. _sourceEnumerator = null;
  479. }
  480. await base.DisposeAsync().ConfigureAwait(false);
  481. }
  482. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  483. {
  484. if (onlyIfCheap)
  485. {
  486. return new ValueTask<int>(-1);
  487. }
  488. return Core(cancellationToken);
  489. async ValueTask<int> Core(CancellationToken cancellationToken)
  490. {
  491. var count = 0;
  492. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  493. {
  494. var items = await _selector(element).ConfigureAwait(false);
  495. checked
  496. {
  497. count += await items.CountAsync().ConfigureAwait(false);
  498. }
  499. }
  500. return count;
  501. }
  502. }
  503. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  504. {
  505. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  506. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  507. return list.ToArray();
  508. }
  509. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  510. {
  511. var list = new List<TResult>();
  512. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  513. {
  514. var items = await _selector(element).ConfigureAwait(false);
  515. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  516. }
  517. return list;
  518. }
  519. protected override async ValueTask<bool> MoveNextCore()
  520. {
  521. switch (_state)
  522. {
  523. case AsyncIteratorState.Allocated:
  524. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  525. _mode = State_Source;
  526. _state = AsyncIteratorState.Iterating;
  527. goto case AsyncIteratorState.Iterating;
  528. case AsyncIteratorState.Iterating:
  529. switch (_mode)
  530. {
  531. case State_Source:
  532. if (await _sourceEnumerator!.MoveNextAsync().ConfigureAwait(false))
  533. {
  534. if (_resultEnumerator != null)
  535. {
  536. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  537. }
  538. var inner = await _selector(_sourceEnumerator.Current).ConfigureAwait(false);
  539. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  540. _mode = State_Result;
  541. goto case State_Result;
  542. }
  543. break;
  544. case State_Result:
  545. if (await _resultEnumerator!.MoveNextAsync().ConfigureAwait(false))
  546. {
  547. _current = _resultEnumerator.Current;
  548. return true;
  549. }
  550. _mode = State_Source;
  551. goto case State_Source; // loop
  552. }
  553. break;
  554. }
  555. await DisposeAsync().ConfigureAwait(false);
  556. return false;
  557. }
  558. }
  559. #if !NO_DEEP_CANCELLATION
  560. private sealed class SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  561. {
  562. private const int State_Source = 1;
  563. private const int State_Result = 2;
  564. private readonly Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  565. private readonly IAsyncEnumerable<TSource> _source;
  566. private int _mode;
  567. private IAsyncEnumerator<TResult>? _resultEnumerator;
  568. private IAsyncEnumerator<TSource>? _sourceEnumerator;
  569. public SelectManyAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  570. {
  571. _source = source;
  572. _selector = selector;
  573. }
  574. public override AsyncIteratorBase<TResult> Clone()
  575. {
  576. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  577. }
  578. public override async ValueTask DisposeAsync()
  579. {
  580. if (_resultEnumerator != null)
  581. {
  582. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  583. _resultEnumerator = null;
  584. }
  585. if (_sourceEnumerator != null)
  586. {
  587. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  588. _sourceEnumerator = null;
  589. }
  590. await base.DisposeAsync().ConfigureAwait(false);
  591. }
  592. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  593. {
  594. if (onlyIfCheap)
  595. {
  596. return new ValueTask<int>(-1);
  597. }
  598. return Core(cancellationToken);
  599. async ValueTask<int> Core(CancellationToken cancellationToken)
  600. {
  601. var count = 0;
  602. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  603. {
  604. var items = await _selector(element, cancellationToken).ConfigureAwait(false);
  605. checked
  606. {
  607. count += await items.CountAsync().ConfigureAwait(false);
  608. }
  609. }
  610. return count;
  611. }
  612. }
  613. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  614. {
  615. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  616. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  617. return list.ToArray();
  618. }
  619. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  620. {
  621. var list = new List<TResult>();
  622. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  623. {
  624. var items = await _selector(element, cancellationToken).ConfigureAwait(false);
  625. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  626. }
  627. return list;
  628. }
  629. protected override async ValueTask<bool> MoveNextCore()
  630. {
  631. switch (_state)
  632. {
  633. case AsyncIteratorState.Allocated:
  634. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  635. _mode = State_Source;
  636. _state = AsyncIteratorState.Iterating;
  637. goto case AsyncIteratorState.Iterating;
  638. case AsyncIteratorState.Iterating:
  639. switch (_mode)
  640. {
  641. case State_Source:
  642. if (await _sourceEnumerator!.MoveNextAsync().ConfigureAwait(false))
  643. {
  644. if (_resultEnumerator != null)
  645. {
  646. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  647. }
  648. var inner = await _selector(_sourceEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  649. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  650. _mode = State_Result;
  651. goto case State_Result;
  652. }
  653. break;
  654. case State_Result:
  655. if (await _resultEnumerator!.MoveNextAsync().ConfigureAwait(false))
  656. {
  657. _current = _resultEnumerator.Current;
  658. return true;
  659. }
  660. _mode = State_Source;
  661. goto case State_Source; // loop
  662. }
  663. break;
  664. }
  665. await DisposeAsync().ConfigureAwait(false);
  666. return false;
  667. }
  668. }
  669. #endif
  670. }
  671. }