SelectMany.cs 68 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  13. {
  14. if (source == null)
  15. throw Error.ArgumentNull(nameof(source));
  16. if (selector == null)
  17. throw Error.ArgumentNull(nameof(selector));
  18. return new SelectManyAsyncIterator<TSource, TResult>(source, selector);
  19. }
  20. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  21. {
  22. if (source == null)
  23. throw Error.ArgumentNull(nameof(source));
  24. if (selector == null)
  25. throw Error.ArgumentNull(nameof(selector));
  26. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(source, selector);
  27. }
  28. #if !NO_DEEP_CANCELLATION
  29. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  30. {
  31. if (source == null)
  32. throw Error.ArgumentNull(nameof(source));
  33. if (selector == null)
  34. throw Error.ArgumentNull(nameof(selector));
  35. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector);
  36. }
  37. #endif
  38. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  39. {
  40. if (source == null)
  41. throw Error.ArgumentNull(nameof(source));
  42. if (selector == null)
  43. throw Error.ArgumentNull(nameof(selector));
  44. #if USE_ASYNC_ITERATOR
  45. return Create(Core);
  46. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  47. {
  48. int index = -1;
  49. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  50. {
  51. checked
  52. {
  53. index++;
  54. }
  55. var inner = selector(element, index);
  56. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  57. {
  58. yield return subElement;
  59. }
  60. }
  61. }
  62. #else
  63. return new SelectManyWithIndexAsyncIterator<TSource, TResult>(source, selector);
  64. #endif
  65. }
  66. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> selector)
  67. {
  68. if (source == null)
  69. throw Error.ArgumentNull(nameof(source));
  70. if (selector == null)
  71. throw Error.ArgumentNull(nameof(selector));
  72. #if USE_ASYNC_ITERATOR
  73. return Create(Core);
  74. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  75. {
  76. int index = -1;
  77. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  78. {
  79. checked
  80. {
  81. index++;
  82. }
  83. var inner = await selector(element, index).ConfigureAwait(false);
  84. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  85. {
  86. yield return subElement;
  87. }
  88. }
  89. }
  90. #else
  91. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TResult>(source, selector);
  92. #endif
  93. }
  94. #if !NO_DEEP_CANCELLATION
  95. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  96. {
  97. if (source == null)
  98. throw Error.ArgumentNull(nameof(source));
  99. if (selector == null)
  100. throw Error.ArgumentNull(nameof(selector));
  101. #if USE_ASYNC_ITERATOR
  102. return Create(Core);
  103. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  104. {
  105. int index = -1;
  106. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  107. {
  108. checked
  109. {
  110. index++;
  111. }
  112. var inner = await selector(element, index, cancellationToken).ConfigureAwait(false);
  113. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  114. {
  115. yield return subElement;
  116. }
  117. }
  118. }
  119. #else
  120. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector);
  121. #endif
  122. }
  123. #endif
  124. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  125. {
  126. if (source == null)
  127. throw Error.ArgumentNull(nameof(source));
  128. if (collectionSelector == null)
  129. throw Error.ArgumentNull(nameof(collectionSelector));
  130. if (resultSelector == null)
  131. throw Error.ArgumentNull(nameof(resultSelector));
  132. #if USE_ASYNC_ITERATOR
  133. return Create(Core);
  134. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  135. {
  136. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  137. {
  138. var inner = collectionSelector(element);
  139. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  140. {
  141. yield return resultSelector(element, subElement);
  142. }
  143. }
  144. }
  145. #else
  146. return new SelectManyAsyncIterator<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  147. #endif
  148. }
  149. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  150. {
  151. if (source == null)
  152. throw Error.ArgumentNull(nameof(source));
  153. if (collectionSelector == null)
  154. throw Error.ArgumentNull(nameof(collectionSelector));
  155. if (resultSelector == null)
  156. throw Error.ArgumentNull(nameof(resultSelector));
  157. #if USE_ASYNC_ITERATOR
  158. return Create(Core);
  159. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  160. {
  161. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  162. {
  163. var inner = await collectionSelector(element).ConfigureAwait(false);
  164. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  165. {
  166. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  167. }
  168. }
  169. }
  170. #else
  171. return new SelectManyAsyncIteratorWithTask<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  172. #endif
  173. }
  174. #if !NO_DEEP_CANCELLATION
  175. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  176. {
  177. if (source == null)
  178. throw Error.ArgumentNull(nameof(source));
  179. if (collectionSelector == null)
  180. throw Error.ArgumentNull(nameof(collectionSelector));
  181. if (resultSelector == null)
  182. throw Error.ArgumentNull(nameof(resultSelector));
  183. #if USE_ASYNC_ITERATOR
  184. return Create(Core);
  185. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  186. {
  187. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  188. {
  189. var inner = await collectionSelector(element, cancellationToken).ConfigureAwait(false);
  190. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  191. {
  192. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  193. }
  194. }
  195. }
  196. #else
  197. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  198. #endif
  199. }
  200. #endif
  201. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  202. {
  203. if (source == null)
  204. throw Error.ArgumentNull(nameof(source));
  205. if (collectionSelector == null)
  206. throw Error.ArgumentNull(nameof(collectionSelector));
  207. if (resultSelector == null)
  208. throw Error.ArgumentNull(nameof(resultSelector));
  209. #if USE_ASYNC_ITERATOR
  210. return Create(Core);
  211. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  212. {
  213. int index = -1;
  214. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  215. {
  216. checked
  217. {
  218. index++;
  219. }
  220. var inner = collectionSelector(element, index);
  221. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  222. {
  223. yield return resultSelector(element, subElement);
  224. }
  225. }
  226. }
  227. #else
  228. return new SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  229. #endif
  230. }
  231. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  232. {
  233. if (source == null)
  234. throw Error.ArgumentNull(nameof(source));
  235. if (collectionSelector == null)
  236. throw Error.ArgumentNull(nameof(collectionSelector));
  237. if (resultSelector == null)
  238. throw Error.ArgumentNull(nameof(resultSelector));
  239. #if USE_ASYNC_ITERATOR
  240. return Create(Core);
  241. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  242. {
  243. int index = -1;
  244. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  245. {
  246. checked
  247. {
  248. index++;
  249. }
  250. var inner = await collectionSelector(element, index).ConfigureAwait(false);
  251. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  252. {
  253. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  254. }
  255. }
  256. }
  257. #else
  258. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  259. #endif
  260. }
  261. #if !NO_DEEP_CANCELLATION
  262. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  263. {
  264. if (source == null)
  265. throw Error.ArgumentNull(nameof(source));
  266. if (collectionSelector == null)
  267. throw Error.ArgumentNull(nameof(collectionSelector));
  268. if (resultSelector == null)
  269. throw Error.ArgumentNull(nameof(resultSelector));
  270. #if USE_ASYNC_ITERATOR
  271. return Create(Core);
  272. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  273. {
  274. int index = -1;
  275. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  276. {
  277. checked
  278. {
  279. index++;
  280. }
  281. var inner = await collectionSelector(element, index, cancellationToken).ConfigureAwait(false);
  282. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  283. {
  284. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  285. }
  286. }
  287. }
  288. #else
  289. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  290. #endif
  291. }
  292. #endif
  293. private sealed class SelectManyAsyncIterator<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  294. {
  295. private const int State_Source = 1;
  296. private const int State_Result = 2;
  297. private readonly Func<TSource, IAsyncEnumerable<TResult>> _selector;
  298. private readonly IAsyncEnumerable<TSource> _source;
  299. private int _mode;
  300. private IAsyncEnumerator<TResult> _resultEnumerator;
  301. private IAsyncEnumerator<TSource> _sourceEnumerator;
  302. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  303. {
  304. Debug.Assert(source != null);
  305. Debug.Assert(selector != null);
  306. _source = source;
  307. _selector = selector;
  308. }
  309. public override AsyncIteratorBase<TResult> Clone()
  310. {
  311. return new SelectManyAsyncIterator<TSource, TResult>(_source, _selector);
  312. }
  313. public override async ValueTask DisposeAsync()
  314. {
  315. if (_resultEnumerator != null)
  316. {
  317. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  318. _resultEnumerator = null;
  319. }
  320. if (_sourceEnumerator != null)
  321. {
  322. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  323. _sourceEnumerator = null;
  324. }
  325. await base.DisposeAsync().ConfigureAwait(false);
  326. }
  327. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  328. {
  329. if (onlyIfCheap)
  330. {
  331. return new ValueTask<int>(-1);
  332. }
  333. return Core(cancellationToken);
  334. async ValueTask<int> Core(CancellationToken _cancellationToken)
  335. {
  336. var count = 0;
  337. await foreach (var element in _source.WithCancellation(_cancellationToken).ConfigureAwait(false))
  338. {
  339. checked
  340. {
  341. count += await _selector(element).CountAsync().ConfigureAwait(false);
  342. }
  343. }
  344. return count;
  345. }
  346. }
  347. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  348. {
  349. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  350. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  351. return list.ToArray();
  352. }
  353. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  354. {
  355. var list = new List<TResult>();
  356. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  357. {
  358. var items = _selector(element);
  359. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  360. }
  361. return list;
  362. }
  363. protected override async ValueTask<bool> MoveNextCore()
  364. {
  365. switch (_state)
  366. {
  367. case AsyncIteratorState.Allocated:
  368. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  369. _mode = State_Source;
  370. _state = AsyncIteratorState.Iterating;
  371. goto case AsyncIteratorState.Iterating;
  372. case AsyncIteratorState.Iterating:
  373. switch (_mode)
  374. {
  375. case State_Source:
  376. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  377. {
  378. if (_resultEnumerator != null)
  379. {
  380. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  381. }
  382. var inner = _selector(_sourceEnumerator.Current);
  383. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  384. _mode = State_Result;
  385. goto case State_Result;
  386. }
  387. break;
  388. case State_Result:
  389. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  390. {
  391. _current = _resultEnumerator.Current;
  392. return true;
  393. }
  394. _mode = State_Source;
  395. goto case State_Source; // loop
  396. }
  397. break;
  398. }
  399. await DisposeAsync().ConfigureAwait(false);
  400. return false;
  401. }
  402. }
  403. private sealed class SelectManyAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  404. {
  405. private const int State_Source = 1;
  406. private const int State_Result = 2;
  407. private readonly Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  408. private readonly IAsyncEnumerable<TSource> _source;
  409. private int _mode;
  410. private IAsyncEnumerator<TResult> _resultEnumerator;
  411. private IAsyncEnumerator<TSource> _sourceEnumerator;
  412. public SelectManyAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  413. {
  414. Debug.Assert(source != null);
  415. Debug.Assert(selector != null);
  416. _source = source;
  417. _selector = selector;
  418. }
  419. public override AsyncIteratorBase<TResult> Clone()
  420. {
  421. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  422. }
  423. public override async ValueTask DisposeAsync()
  424. {
  425. if (_resultEnumerator != null)
  426. {
  427. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  428. _resultEnumerator = null;
  429. }
  430. if (_sourceEnumerator != null)
  431. {
  432. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  433. _sourceEnumerator = null;
  434. }
  435. await base.DisposeAsync().ConfigureAwait(false);
  436. }
  437. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  438. {
  439. if (onlyIfCheap)
  440. {
  441. return new ValueTask<int>(-1);
  442. }
  443. return Core(cancellationToken);
  444. async ValueTask<int> Core(CancellationToken _cancellationToken)
  445. {
  446. var count = 0;
  447. await foreach (var element in _source.WithCancellation(_cancellationToken).ConfigureAwait(false))
  448. {
  449. var items = await _selector(element).ConfigureAwait(false);
  450. checked
  451. {
  452. count += await items.CountAsync().ConfigureAwait(false);
  453. }
  454. }
  455. return count;
  456. }
  457. }
  458. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  459. {
  460. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  461. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  462. return list.ToArray();
  463. }
  464. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  465. {
  466. var list = new List<TResult>();
  467. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  468. {
  469. var items = await _selector(element).ConfigureAwait(false);
  470. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  471. }
  472. return list;
  473. }
  474. protected override async ValueTask<bool> MoveNextCore()
  475. {
  476. switch (_state)
  477. {
  478. case AsyncIteratorState.Allocated:
  479. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  480. _mode = State_Source;
  481. _state = AsyncIteratorState.Iterating;
  482. goto case AsyncIteratorState.Iterating;
  483. case AsyncIteratorState.Iterating:
  484. switch (_mode)
  485. {
  486. case State_Source:
  487. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  488. {
  489. if (_resultEnumerator != null)
  490. {
  491. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  492. }
  493. var inner = await _selector(_sourceEnumerator.Current).ConfigureAwait(false);
  494. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  495. _mode = State_Result;
  496. goto case State_Result;
  497. }
  498. break;
  499. case State_Result:
  500. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  501. {
  502. _current = _resultEnumerator.Current;
  503. return true;
  504. }
  505. _mode = State_Source;
  506. goto case State_Source; // loop
  507. }
  508. break;
  509. }
  510. await DisposeAsync().ConfigureAwait(false);
  511. return false;
  512. }
  513. }
  514. #if !NO_DEEP_CANCELLATION
  515. private sealed class SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  516. {
  517. private const int State_Source = 1;
  518. private const int State_Result = 2;
  519. private readonly Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  520. private readonly IAsyncEnumerable<TSource> _source;
  521. private int _mode;
  522. private IAsyncEnumerator<TResult> _resultEnumerator;
  523. private IAsyncEnumerator<TSource> _sourceEnumerator;
  524. public SelectManyAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  525. {
  526. Debug.Assert(source != null);
  527. Debug.Assert(selector != null);
  528. _source = source;
  529. _selector = selector;
  530. }
  531. public override AsyncIteratorBase<TResult> Clone()
  532. {
  533. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  534. }
  535. public override async ValueTask DisposeAsync()
  536. {
  537. if (_resultEnumerator != null)
  538. {
  539. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  540. _resultEnumerator = null;
  541. }
  542. if (_sourceEnumerator != null)
  543. {
  544. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  545. _sourceEnumerator = null;
  546. }
  547. await base.DisposeAsync().ConfigureAwait(false);
  548. }
  549. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  550. {
  551. if (onlyIfCheap)
  552. {
  553. return new ValueTask<int>(-1);
  554. }
  555. return Core(cancellationToken);
  556. async ValueTask<int> Core(CancellationToken _cancellationToken)
  557. {
  558. var count = 0;
  559. await foreach (var element in _source.WithCancellation(_cancellationToken).ConfigureAwait(false))
  560. {
  561. var items = await _selector(element, _cancellationToken).ConfigureAwait(false);
  562. checked
  563. {
  564. count += await items.CountAsync().ConfigureAwait(false);
  565. }
  566. }
  567. return count;
  568. }
  569. }
  570. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  571. {
  572. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  573. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  574. return list.ToArray();
  575. }
  576. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  577. {
  578. var list = new List<TResult>();
  579. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  580. {
  581. var items = await _selector(element, cancellationToken).ConfigureAwait(false);
  582. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  583. }
  584. return list;
  585. }
  586. protected override async ValueTask<bool> MoveNextCore()
  587. {
  588. switch (_state)
  589. {
  590. case AsyncIteratorState.Allocated:
  591. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  592. _mode = State_Source;
  593. _state = AsyncIteratorState.Iterating;
  594. goto case AsyncIteratorState.Iterating;
  595. case AsyncIteratorState.Iterating:
  596. switch (_mode)
  597. {
  598. case State_Source:
  599. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  600. {
  601. if (_resultEnumerator != null)
  602. {
  603. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  604. }
  605. var inner = await _selector(_sourceEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  606. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  607. _mode = State_Result;
  608. goto case State_Result;
  609. }
  610. break;
  611. case State_Result:
  612. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  613. {
  614. _current = _resultEnumerator.Current;
  615. return true;
  616. }
  617. _mode = State_Source;
  618. goto case State_Source; // loop
  619. }
  620. break;
  621. }
  622. await DisposeAsync().ConfigureAwait(false);
  623. return false;
  624. }
  625. }
  626. #endif
  627. #if !USE_ASYNC_ITERATOR
  628. private sealed class SelectManyAsyncIterator<TSource, TCollection, TResult> : AsyncIterator<TResult>
  629. {
  630. private const int State_Source = 1;
  631. private const int State_Result = 2;
  632. private readonly Func<TSource, IAsyncEnumerable<TCollection>> _collectionSelector;
  633. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  634. private readonly IAsyncEnumerable<TSource> _source;
  635. private TSource _currentSource;
  636. private int _mode;
  637. private IAsyncEnumerator<TCollection> _resultEnumerator;
  638. private IAsyncEnumerator<TSource> _sourceEnumerator;
  639. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  640. {
  641. Debug.Assert(source != null);
  642. Debug.Assert(collectionSelector != null);
  643. Debug.Assert(resultSelector != null);
  644. _source = source;
  645. _collectionSelector = collectionSelector;
  646. _resultSelector = resultSelector;
  647. }
  648. public override AsyncIteratorBase<TResult> Clone()
  649. {
  650. return new SelectManyAsyncIterator<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  651. }
  652. public override async ValueTask DisposeAsync()
  653. {
  654. if (_resultEnumerator != null)
  655. {
  656. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  657. _resultEnumerator = null;
  658. }
  659. if (_sourceEnumerator != null)
  660. {
  661. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  662. _sourceEnumerator = null;
  663. }
  664. _currentSource = default;
  665. await base.DisposeAsync().ConfigureAwait(false);
  666. }
  667. protected override async ValueTask<bool> MoveNextCore()
  668. {
  669. switch (_state)
  670. {
  671. case AsyncIteratorState.Allocated:
  672. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  673. _mode = State_Source;
  674. _state = AsyncIteratorState.Iterating;
  675. goto case AsyncIteratorState.Iterating;
  676. case AsyncIteratorState.Iterating:
  677. switch (_mode)
  678. {
  679. case State_Source:
  680. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  681. {
  682. if (_resultEnumerator != null)
  683. {
  684. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  685. }
  686. _currentSource = _sourceEnumerator.Current;
  687. var inner = _collectionSelector(_currentSource);
  688. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  689. _mode = State_Result;
  690. goto case State_Result;
  691. }
  692. break;
  693. case State_Result:
  694. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  695. {
  696. _current = _resultSelector(_currentSource, _resultEnumerator.Current);
  697. return true;
  698. }
  699. _mode = State_Source;
  700. goto case State_Source; // loop
  701. }
  702. break;
  703. }
  704. await DisposeAsync().ConfigureAwait(false);
  705. return false;
  706. }
  707. }
  708. private sealed class SelectManyAsyncIteratorWithTask<TSource, TCollection, TResult> : AsyncIterator<TResult>
  709. {
  710. private const int State_Source = 1;
  711. private const int State_Result = 2;
  712. private readonly Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  713. private readonly Func<TSource, TCollection, ValueTask<TResult>> _resultSelector;
  714. private readonly IAsyncEnumerable<TSource> _source;
  715. private TSource _currentSource;
  716. private int _mode;
  717. private IAsyncEnumerator<TCollection> _resultEnumerator;
  718. private IAsyncEnumerator<TSource> _sourceEnumerator;
  719. public SelectManyAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  720. {
  721. Debug.Assert(source != null);
  722. Debug.Assert(collectionSelector != null);
  723. Debug.Assert(resultSelector != null);
  724. _source = source;
  725. _collectionSelector = collectionSelector;
  726. _resultSelector = resultSelector;
  727. }
  728. public override AsyncIteratorBase<TResult> Clone()
  729. {
  730. return new SelectManyAsyncIteratorWithTask<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  731. }
  732. public override async ValueTask DisposeAsync()
  733. {
  734. if (_resultEnumerator != null)
  735. {
  736. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  737. _resultEnumerator = null;
  738. }
  739. if (_sourceEnumerator != null)
  740. {
  741. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  742. _sourceEnumerator = null;
  743. }
  744. _currentSource = default;
  745. await base.DisposeAsync().ConfigureAwait(false);
  746. }
  747. protected override async ValueTask<bool> MoveNextCore()
  748. {
  749. switch (_state)
  750. {
  751. case AsyncIteratorState.Allocated:
  752. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  753. _mode = State_Source;
  754. _state = AsyncIteratorState.Iterating;
  755. goto case AsyncIteratorState.Iterating;
  756. case AsyncIteratorState.Iterating:
  757. switch (_mode)
  758. {
  759. case State_Source:
  760. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  761. {
  762. if (_resultEnumerator != null)
  763. {
  764. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  765. }
  766. _currentSource = _sourceEnumerator.Current;
  767. var inner = await _collectionSelector(_currentSource).ConfigureAwait(false);
  768. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  769. _mode = State_Result;
  770. goto case State_Result;
  771. }
  772. break;
  773. case State_Result:
  774. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  775. {
  776. _current = await _resultSelector(_currentSource, _resultEnumerator.Current).ConfigureAwait(false);
  777. return true;
  778. }
  779. _mode = State_Source;
  780. goto case State_Source; // loop
  781. }
  782. break;
  783. }
  784. await DisposeAsync().ConfigureAwait(false);
  785. return false;
  786. }
  787. }
  788. #if !NO_DEEP_CANCELLATION
  789. private sealed class SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult> : AsyncIterator<TResult>
  790. {
  791. private const int State_Source = 1;
  792. private const int State_Result = 2;
  793. private readonly Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  794. private readonly Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> _resultSelector;
  795. private readonly IAsyncEnumerable<TSource> _source;
  796. private TSource _currentSource;
  797. private int _mode;
  798. private IAsyncEnumerator<TCollection> _resultEnumerator;
  799. private IAsyncEnumerator<TSource> _sourceEnumerator;
  800. public SelectManyAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  801. {
  802. Debug.Assert(source != null);
  803. Debug.Assert(collectionSelector != null);
  804. Debug.Assert(resultSelector != null);
  805. _source = source;
  806. _collectionSelector = collectionSelector;
  807. _resultSelector = resultSelector;
  808. }
  809. public override AsyncIteratorBase<TResult> Clone()
  810. {
  811. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  812. }
  813. public override async ValueTask DisposeAsync()
  814. {
  815. if (_resultEnumerator != null)
  816. {
  817. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  818. _resultEnumerator = null;
  819. }
  820. if (_sourceEnumerator != null)
  821. {
  822. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  823. _sourceEnumerator = null;
  824. }
  825. _currentSource = default;
  826. await base.DisposeAsync().ConfigureAwait(false);
  827. }
  828. protected override async ValueTask<bool> MoveNextCore()
  829. {
  830. switch (_state)
  831. {
  832. case AsyncIteratorState.Allocated:
  833. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  834. _mode = State_Source;
  835. _state = AsyncIteratorState.Iterating;
  836. goto case AsyncIteratorState.Iterating;
  837. case AsyncIteratorState.Iterating:
  838. switch (_mode)
  839. {
  840. case State_Source:
  841. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  842. {
  843. if (_resultEnumerator != null)
  844. {
  845. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  846. }
  847. _currentSource = _sourceEnumerator.Current;
  848. var inner = await _collectionSelector(_currentSource, _cancellationToken).ConfigureAwait(false);
  849. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  850. _mode = State_Result;
  851. goto case State_Result;
  852. }
  853. break;
  854. case State_Result:
  855. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  856. {
  857. _current = await _resultSelector(_currentSource, _resultEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  858. return true;
  859. }
  860. _mode = State_Source;
  861. goto case State_Source; // loop
  862. }
  863. break;
  864. }
  865. await DisposeAsync().ConfigureAwait(false);
  866. return false;
  867. }
  868. }
  869. #endif
  870. private sealed class SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult> : AsyncIterator<TResult>
  871. {
  872. private const int State_Source = 1;
  873. private const int State_Result = 2;
  874. private readonly Func<TSource, int, IAsyncEnumerable<TCollection>> _collectionSelector;
  875. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  876. private readonly IAsyncEnumerable<TSource> _source;
  877. private TSource _currentSource;
  878. private int _index;
  879. private int _mode;
  880. private IAsyncEnumerator<TCollection> _resultEnumerator;
  881. private IAsyncEnumerator<TSource> _sourceEnumerator;
  882. public SelectManyWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  883. {
  884. Debug.Assert(source != null);
  885. Debug.Assert(collectionSelector != null);
  886. Debug.Assert(resultSelector != null);
  887. _source = source;
  888. _collectionSelector = collectionSelector;
  889. _resultSelector = resultSelector;
  890. }
  891. public override AsyncIteratorBase<TResult> Clone()
  892. {
  893. return new SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  894. }
  895. public override async ValueTask DisposeAsync()
  896. {
  897. if (_resultEnumerator != null)
  898. {
  899. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  900. _resultEnumerator = null;
  901. }
  902. if (_sourceEnumerator != null)
  903. {
  904. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  905. _sourceEnumerator = null;
  906. }
  907. _currentSource = default;
  908. await base.DisposeAsync().ConfigureAwait(false);
  909. }
  910. protected override async ValueTask<bool> MoveNextCore()
  911. {
  912. switch (_state)
  913. {
  914. case AsyncIteratorState.Allocated:
  915. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  916. _index = -1;
  917. _mode = State_Source;
  918. _state = AsyncIteratorState.Iterating;
  919. goto case AsyncIteratorState.Iterating;
  920. case AsyncIteratorState.Iterating:
  921. switch (_mode)
  922. {
  923. case State_Source:
  924. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  925. {
  926. if (_resultEnumerator != null)
  927. {
  928. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  929. }
  930. _currentSource = _sourceEnumerator.Current;
  931. checked
  932. {
  933. _index++;
  934. }
  935. var inner = _collectionSelector(_currentSource, _index);
  936. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  937. _mode = State_Result;
  938. goto case State_Result;
  939. }
  940. break;
  941. case State_Result:
  942. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  943. {
  944. _current = _resultSelector(_currentSource, _resultEnumerator.Current);
  945. return true;
  946. }
  947. _mode = State_Source;
  948. goto case State_Source; // loop
  949. }
  950. break;
  951. }
  952. await DisposeAsync().ConfigureAwait(false);
  953. return false;
  954. }
  955. }
  956. private sealed class SelectManyWithIndexAsyncIteratorWithTask<TSource, TCollection, TResult> : AsyncIterator<TResult>
  957. {
  958. private const int State_Source = 1;
  959. private const int State_Result = 2;
  960. private readonly Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  961. private readonly Func<TSource, TCollection, ValueTask<TResult>> _resultSelector;
  962. private readonly IAsyncEnumerable<TSource> _source;
  963. private TSource _currentSource;
  964. private int _index;
  965. private int _mode;
  966. private IAsyncEnumerator<TCollection> _resultEnumerator;
  967. private IAsyncEnumerator<TSource> _sourceEnumerator;
  968. public SelectManyWithIndexAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  969. {
  970. Debug.Assert(source != null);
  971. Debug.Assert(collectionSelector != null);
  972. Debug.Assert(resultSelector != null);
  973. _source = source;
  974. _collectionSelector = collectionSelector;
  975. _resultSelector = resultSelector;
  976. }
  977. public override AsyncIteratorBase<TResult> Clone()
  978. {
  979. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  980. }
  981. public override async ValueTask DisposeAsync()
  982. {
  983. if (_resultEnumerator != null)
  984. {
  985. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  986. _resultEnumerator = null;
  987. }
  988. if (_sourceEnumerator != null)
  989. {
  990. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  991. _sourceEnumerator = null;
  992. }
  993. _currentSource = default;
  994. await base.DisposeAsync().ConfigureAwait(false);
  995. }
  996. protected override async ValueTask<bool> MoveNextCore()
  997. {
  998. switch (_state)
  999. {
  1000. case AsyncIteratorState.Allocated:
  1001. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1002. _index = -1;
  1003. _mode = State_Source;
  1004. _state = AsyncIteratorState.Iterating;
  1005. goto case AsyncIteratorState.Iterating;
  1006. case AsyncIteratorState.Iterating:
  1007. switch (_mode)
  1008. {
  1009. case State_Source:
  1010. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1011. {
  1012. if (_resultEnumerator != null)
  1013. {
  1014. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1015. }
  1016. _currentSource = _sourceEnumerator.Current;
  1017. checked
  1018. {
  1019. _index++;
  1020. }
  1021. var inner = await _collectionSelector(_currentSource, _index).ConfigureAwait(false);
  1022. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1023. _mode = State_Result;
  1024. goto case State_Result;
  1025. }
  1026. break;
  1027. case State_Result:
  1028. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1029. {
  1030. _current = await _resultSelector(_currentSource, _resultEnumerator.Current).ConfigureAwait(false);
  1031. return true;
  1032. }
  1033. _mode = State_Source;
  1034. goto case State_Source; // loop
  1035. }
  1036. break;
  1037. }
  1038. await DisposeAsync().ConfigureAwait(false);
  1039. return false;
  1040. }
  1041. }
  1042. #if !NO_DEEP_CANCELLATION
  1043. private sealed class SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult> : AsyncIterator<TResult>
  1044. {
  1045. private const int State_Source = 1;
  1046. private const int State_Result = 2;
  1047. private readonly Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  1048. private readonly Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> _resultSelector;
  1049. private readonly IAsyncEnumerable<TSource> _source;
  1050. private TSource _currentSource;
  1051. private int _index;
  1052. private int _mode;
  1053. private IAsyncEnumerator<TCollection> _resultEnumerator;
  1054. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1055. public SelectManyWithIndexAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  1056. {
  1057. Debug.Assert(source != null);
  1058. Debug.Assert(collectionSelector != null);
  1059. Debug.Assert(resultSelector != null);
  1060. _source = source;
  1061. _collectionSelector = collectionSelector;
  1062. _resultSelector = resultSelector;
  1063. }
  1064. public override AsyncIteratorBase<TResult> Clone()
  1065. {
  1066. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  1067. }
  1068. public override async ValueTask DisposeAsync()
  1069. {
  1070. if (_resultEnumerator != null)
  1071. {
  1072. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1073. _resultEnumerator = null;
  1074. }
  1075. if (_sourceEnumerator != null)
  1076. {
  1077. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1078. _sourceEnumerator = null;
  1079. }
  1080. _currentSource = default;
  1081. await base.DisposeAsync().ConfigureAwait(false);
  1082. }
  1083. protected override async ValueTask<bool> MoveNextCore()
  1084. {
  1085. switch (_state)
  1086. {
  1087. case AsyncIteratorState.Allocated:
  1088. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1089. _index = -1;
  1090. _mode = State_Source;
  1091. _state = AsyncIteratorState.Iterating;
  1092. goto case AsyncIteratorState.Iterating;
  1093. case AsyncIteratorState.Iterating:
  1094. switch (_mode)
  1095. {
  1096. case State_Source:
  1097. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1098. {
  1099. if (_resultEnumerator != null)
  1100. {
  1101. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1102. }
  1103. _currentSource = _sourceEnumerator.Current;
  1104. checked
  1105. {
  1106. _index++;
  1107. }
  1108. var inner = await _collectionSelector(_currentSource, _index, _cancellationToken).ConfigureAwait(false);
  1109. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1110. _mode = State_Result;
  1111. goto case State_Result;
  1112. }
  1113. break;
  1114. case State_Result:
  1115. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1116. {
  1117. _current = await _resultSelector(_currentSource, _resultEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  1118. return true;
  1119. }
  1120. _mode = State_Source;
  1121. goto case State_Source; // loop
  1122. }
  1123. break;
  1124. }
  1125. await DisposeAsync().ConfigureAwait(false);
  1126. return false;
  1127. }
  1128. }
  1129. #endif
  1130. private sealed class SelectManyWithIndexAsyncIterator<TSource, TResult> : AsyncIterator<TResult>
  1131. {
  1132. private const int State_Source = 1;
  1133. private const int State_Result = 2;
  1134. private readonly Func<TSource, int, IAsyncEnumerable<TResult>> _selector;
  1135. private readonly IAsyncEnumerable<TSource> _source;
  1136. private int _index;
  1137. private int _mode;
  1138. private IAsyncEnumerator<TResult> _resultEnumerator;
  1139. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1140. public SelectManyWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  1141. {
  1142. Debug.Assert(source != null);
  1143. Debug.Assert(selector != null);
  1144. _source = source;
  1145. _selector = selector;
  1146. }
  1147. public override AsyncIteratorBase<TResult> Clone()
  1148. {
  1149. return new SelectManyWithIndexAsyncIterator<TSource, TResult>(_source, _selector);
  1150. }
  1151. public override async ValueTask DisposeAsync()
  1152. {
  1153. if (_resultEnumerator != null)
  1154. {
  1155. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1156. _resultEnumerator = null;
  1157. }
  1158. if (_sourceEnumerator != null)
  1159. {
  1160. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1161. _sourceEnumerator = null;
  1162. }
  1163. await base.DisposeAsync().ConfigureAwait(false);
  1164. }
  1165. protected override async ValueTask<bool> MoveNextCore()
  1166. {
  1167. switch (_state)
  1168. {
  1169. case AsyncIteratorState.Allocated:
  1170. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1171. _index = -1;
  1172. _mode = State_Source;
  1173. _state = AsyncIteratorState.Iterating;
  1174. goto case AsyncIteratorState.Iterating;
  1175. case AsyncIteratorState.Iterating:
  1176. switch (_mode)
  1177. {
  1178. case State_Source:
  1179. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1180. {
  1181. if (_resultEnumerator != null)
  1182. {
  1183. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1184. }
  1185. checked
  1186. {
  1187. _index++;
  1188. }
  1189. var inner = _selector(_sourceEnumerator.Current, _index);
  1190. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1191. _mode = State_Result;
  1192. goto case State_Result;
  1193. }
  1194. break;
  1195. case State_Result:
  1196. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1197. {
  1198. _current = _resultEnumerator.Current;
  1199. return true;
  1200. }
  1201. _mode = State_Source;
  1202. goto case State_Source; // loop
  1203. }
  1204. break;
  1205. }
  1206. await DisposeAsync().ConfigureAwait(false);
  1207. return false;
  1208. }
  1209. }
  1210. private sealed class SelectManyWithIndexAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>
  1211. {
  1212. private const int State_Source = 1;
  1213. private const int State_Result = 2;
  1214. private readonly Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  1215. private readonly IAsyncEnumerable<TSource> _source;
  1216. private int _index;
  1217. private int _mode;
  1218. private IAsyncEnumerator<TResult> _resultEnumerator;
  1219. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1220. public SelectManyWithIndexAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> selector)
  1221. {
  1222. Debug.Assert(source != null);
  1223. Debug.Assert(selector != null);
  1224. _source = source;
  1225. _selector = selector;
  1226. }
  1227. public override AsyncIteratorBase<TResult> Clone()
  1228. {
  1229. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  1230. }
  1231. public override async ValueTask DisposeAsync()
  1232. {
  1233. if (_resultEnumerator != null)
  1234. {
  1235. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1236. _resultEnumerator = null;
  1237. }
  1238. if (_sourceEnumerator != null)
  1239. {
  1240. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1241. _sourceEnumerator = null;
  1242. }
  1243. await base.DisposeAsync().ConfigureAwait(false);
  1244. }
  1245. protected override async ValueTask<bool> MoveNextCore()
  1246. {
  1247. switch (_state)
  1248. {
  1249. case AsyncIteratorState.Allocated:
  1250. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1251. _index = -1;
  1252. _mode = State_Source;
  1253. _state = AsyncIteratorState.Iterating;
  1254. goto case AsyncIteratorState.Iterating;
  1255. case AsyncIteratorState.Iterating:
  1256. switch (_mode)
  1257. {
  1258. case State_Source:
  1259. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1260. {
  1261. if (_resultEnumerator != null)
  1262. {
  1263. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1264. }
  1265. checked
  1266. {
  1267. _index++;
  1268. }
  1269. var inner = await _selector(_sourceEnumerator.Current, _index).ConfigureAwait(false);
  1270. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1271. _mode = State_Result;
  1272. goto case State_Result;
  1273. }
  1274. break;
  1275. case State_Result:
  1276. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1277. {
  1278. _current = _resultEnumerator.Current;
  1279. return true;
  1280. }
  1281. _mode = State_Source;
  1282. goto case State_Source; // loop
  1283. }
  1284. break;
  1285. }
  1286. await DisposeAsync().ConfigureAwait(false);
  1287. return false;
  1288. }
  1289. }
  1290. #if !NO_DEEP_CANCELLATION
  1291. private sealed class SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>
  1292. {
  1293. private const int State_Source = 1;
  1294. private const int State_Result = 2;
  1295. private readonly Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  1296. private readonly IAsyncEnumerable<TSource> _source;
  1297. private int _index;
  1298. private int _mode;
  1299. private IAsyncEnumerator<TResult> _resultEnumerator;
  1300. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1301. public SelectManyWithIndexAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  1302. {
  1303. Debug.Assert(source != null);
  1304. Debug.Assert(selector != null);
  1305. _source = source;
  1306. _selector = selector;
  1307. }
  1308. public override AsyncIteratorBase<TResult> Clone()
  1309. {
  1310. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  1311. }
  1312. public override async ValueTask DisposeAsync()
  1313. {
  1314. if (_resultEnumerator != null)
  1315. {
  1316. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1317. _resultEnumerator = null;
  1318. }
  1319. if (_sourceEnumerator != null)
  1320. {
  1321. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1322. _sourceEnumerator = null;
  1323. }
  1324. await base.DisposeAsync().ConfigureAwait(false);
  1325. }
  1326. protected override async ValueTask<bool> MoveNextCore()
  1327. {
  1328. switch (_state)
  1329. {
  1330. case AsyncIteratorState.Allocated:
  1331. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1332. _index = -1;
  1333. _mode = State_Source;
  1334. _state = AsyncIteratorState.Iterating;
  1335. goto case AsyncIteratorState.Iterating;
  1336. case AsyncIteratorState.Iterating:
  1337. switch (_mode)
  1338. {
  1339. case State_Source:
  1340. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1341. {
  1342. if (_resultEnumerator != null)
  1343. {
  1344. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1345. }
  1346. checked
  1347. {
  1348. _index++;
  1349. }
  1350. var inner = await _selector(_sourceEnumerator.Current, _index, _cancellationToken).ConfigureAwait(false);
  1351. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1352. _mode = State_Result;
  1353. goto case State_Result;
  1354. }
  1355. break;
  1356. case State_Result:
  1357. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1358. {
  1359. _current = _resultEnumerator.Current;
  1360. return true;
  1361. }
  1362. _mode = State_Source;
  1363. goto case State_Source; // loop
  1364. }
  1365. break;
  1366. }
  1367. await DisposeAsync().ConfigureAwait(false);
  1368. return false;
  1369. }
  1370. }
  1371. #endif
  1372. #endif
  1373. }
  1374. }