SelectMany.cs 69 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  13. {
  14. if (source == null)
  15. throw Error.ArgumentNull(nameof(source));
  16. if (selector == null)
  17. throw Error.ArgumentNull(nameof(selector));
  18. return new SelectManyAsyncIterator<TSource, TResult>(source, selector);
  19. }
  20. // REVIEW: Should we keep these overloads that return ValueTask<IAsyncEnumerable<TResult>>? One could argue the selector is async twice.
  21. internal static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  22. {
  23. if (source == null)
  24. throw Error.ArgumentNull(nameof(source));
  25. if (selector == null)
  26. throw Error.ArgumentNull(nameof(selector));
  27. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(source, selector);
  28. }
  29. #if !NO_DEEP_CANCELLATION
  30. internal static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  31. {
  32. if (source == null)
  33. throw Error.ArgumentNull(nameof(source));
  34. if (selector == null)
  35. throw Error.ArgumentNull(nameof(selector));
  36. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector);
  37. }
  38. #endif
  39. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  40. {
  41. if (source == null)
  42. throw Error.ArgumentNull(nameof(source));
  43. if (selector == null)
  44. throw Error.ArgumentNull(nameof(selector));
  45. #if USE_ASYNC_ITERATOR
  46. return Create(Core);
  47. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  48. {
  49. var index = -1;
  50. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  51. {
  52. checked
  53. {
  54. index++;
  55. }
  56. var inner = selector(element, index);
  57. await foreach (var subElement in AsyncEnumerableExtensions.WithCancellation(inner, cancellationToken).ConfigureAwait(false))
  58. {
  59. yield return subElement;
  60. }
  61. }
  62. }
  63. #else
  64. return new SelectManyWithIndexAsyncIterator<TSource, TResult>(source, selector);
  65. #endif
  66. }
  67. internal static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> selector)
  68. {
  69. if (source == null)
  70. throw Error.ArgumentNull(nameof(source));
  71. if (selector == null)
  72. throw Error.ArgumentNull(nameof(selector));
  73. #if USE_ASYNC_ITERATOR
  74. return Create(Core);
  75. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  76. {
  77. var index = -1;
  78. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  79. {
  80. checked
  81. {
  82. index++;
  83. }
  84. var inner = await selector(element, index).ConfigureAwait(false);
  85. await foreach (var subElement in AsyncEnumerableExtensions.WithCancellation(inner, cancellationToken).ConfigureAwait(false))
  86. {
  87. yield return subElement;
  88. }
  89. }
  90. }
  91. #else
  92. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TResult>(source, selector);
  93. #endif
  94. }
  95. #if !NO_DEEP_CANCELLATION
  96. internal static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  97. {
  98. if (source == null)
  99. throw Error.ArgumentNull(nameof(source));
  100. if (selector == null)
  101. throw Error.ArgumentNull(nameof(selector));
  102. #if USE_ASYNC_ITERATOR
  103. return Create(Core);
  104. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  105. {
  106. var index = -1;
  107. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  108. {
  109. checked
  110. {
  111. index++;
  112. }
  113. var inner = await selector(element, index, cancellationToken).ConfigureAwait(false);
  114. await foreach (var subElement in AsyncEnumerableExtensions.WithCancellation(inner, cancellationToken).ConfigureAwait(false))
  115. {
  116. yield return subElement;
  117. }
  118. }
  119. }
  120. #else
  121. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector);
  122. #endif
  123. }
  124. #endif
  125. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  126. {
  127. if (source == null)
  128. throw Error.ArgumentNull(nameof(source));
  129. if (collectionSelector == null)
  130. throw Error.ArgumentNull(nameof(collectionSelector));
  131. if (resultSelector == null)
  132. throw Error.ArgumentNull(nameof(resultSelector));
  133. #if USE_ASYNC_ITERATOR
  134. return Create(Core);
  135. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  136. {
  137. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  138. {
  139. var inner = collectionSelector(element);
  140. await foreach (var subElement in AsyncEnumerableExtensions.WithCancellation(inner, cancellationToken).ConfigureAwait(false))
  141. {
  142. yield return resultSelector(element, subElement);
  143. }
  144. }
  145. }
  146. #else
  147. return new SelectManyAsyncIterator<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  148. #endif
  149. }
  150. internal static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  151. {
  152. if (source == null)
  153. throw Error.ArgumentNull(nameof(source));
  154. if (collectionSelector == null)
  155. throw Error.ArgumentNull(nameof(collectionSelector));
  156. if (resultSelector == null)
  157. throw Error.ArgumentNull(nameof(resultSelector));
  158. #if USE_ASYNC_ITERATOR
  159. return Create(Core);
  160. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  161. {
  162. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  163. {
  164. var inner = await collectionSelector(element).ConfigureAwait(false);
  165. await foreach (var subElement in AsyncEnumerableExtensions.WithCancellation(inner, cancellationToken).ConfigureAwait(false))
  166. {
  167. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  168. }
  169. }
  170. }
  171. #else
  172. return new SelectManyAsyncIteratorWithTask<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  173. #endif
  174. }
  175. #if !NO_DEEP_CANCELLATION
  176. internal static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  177. {
  178. if (source == null)
  179. throw Error.ArgumentNull(nameof(source));
  180. if (collectionSelector == null)
  181. throw Error.ArgumentNull(nameof(collectionSelector));
  182. if (resultSelector == null)
  183. throw Error.ArgumentNull(nameof(resultSelector));
  184. #if USE_ASYNC_ITERATOR
  185. return Create(Core);
  186. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  187. {
  188. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  189. {
  190. var inner = await collectionSelector(element, cancellationToken).ConfigureAwait(false);
  191. await foreach (var subElement in AsyncEnumerableExtensions.WithCancellation(inner, cancellationToken).ConfigureAwait(false))
  192. {
  193. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  194. }
  195. }
  196. }
  197. #else
  198. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  199. #endif
  200. }
  201. #endif
  202. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  203. {
  204. if (source == null)
  205. throw Error.ArgumentNull(nameof(source));
  206. if (collectionSelector == null)
  207. throw Error.ArgumentNull(nameof(collectionSelector));
  208. if (resultSelector == null)
  209. throw Error.ArgumentNull(nameof(resultSelector));
  210. #if USE_ASYNC_ITERATOR
  211. return Create(Core);
  212. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  213. {
  214. var index = -1;
  215. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  216. {
  217. checked
  218. {
  219. index++;
  220. }
  221. var inner = collectionSelector(element, index);
  222. await foreach (var subElement in AsyncEnumerableExtensions.WithCancellation(inner, cancellationToken).ConfigureAwait(false))
  223. {
  224. yield return resultSelector(element, subElement);
  225. }
  226. }
  227. }
  228. #else
  229. return new SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  230. #endif
  231. }
  232. internal static IAsyncEnumerable<TResult> SelectManyAwaitCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  233. {
  234. if (source == null)
  235. throw Error.ArgumentNull(nameof(source));
  236. if (collectionSelector == null)
  237. throw Error.ArgumentNull(nameof(collectionSelector));
  238. if (resultSelector == null)
  239. throw Error.ArgumentNull(nameof(resultSelector));
  240. #if USE_ASYNC_ITERATOR
  241. return Create(Core);
  242. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  243. {
  244. var index = -1;
  245. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  246. {
  247. checked
  248. {
  249. index++;
  250. }
  251. var inner = await collectionSelector(element, index).ConfigureAwait(false);
  252. await foreach (var subElement in AsyncEnumerableExtensions.WithCancellation(inner, cancellationToken).ConfigureAwait(false))
  253. {
  254. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  255. }
  256. }
  257. }
  258. #else
  259. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  260. #endif
  261. }
  262. #if !NO_DEEP_CANCELLATION
  263. internal static IAsyncEnumerable<TResult> SelectManyAwaitWithCancellationCore<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  264. {
  265. if (source == null)
  266. throw Error.ArgumentNull(nameof(source));
  267. if (collectionSelector == null)
  268. throw Error.ArgumentNull(nameof(collectionSelector));
  269. if (resultSelector == null)
  270. throw Error.ArgumentNull(nameof(resultSelector));
  271. #if USE_ASYNC_ITERATOR
  272. return Create(Core);
  273. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  274. {
  275. var index = -1;
  276. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  277. {
  278. checked
  279. {
  280. index++;
  281. }
  282. var inner = await collectionSelector(element, index, cancellationToken).ConfigureAwait(false);
  283. await foreach (var subElement in AsyncEnumerableExtensions.WithCancellation(inner, cancellationToken).ConfigureAwait(false))
  284. {
  285. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  286. }
  287. }
  288. }
  289. #else
  290. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  291. #endif
  292. }
  293. #endif
  294. private sealed class SelectManyAsyncIterator<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  295. {
  296. private const int State_Source = 1;
  297. private const int State_Result = 2;
  298. private readonly Func<TSource, IAsyncEnumerable<TResult>> _selector;
  299. private readonly IAsyncEnumerable<TSource> _source;
  300. private int _mode;
  301. private IAsyncEnumerator<TResult> _resultEnumerator;
  302. private IAsyncEnumerator<TSource> _sourceEnumerator;
  303. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  304. {
  305. Debug.Assert(source != null);
  306. Debug.Assert(selector != null);
  307. _source = source;
  308. _selector = selector;
  309. }
  310. public override AsyncIteratorBase<TResult> Clone()
  311. {
  312. return new SelectManyAsyncIterator<TSource, TResult>(_source, _selector);
  313. }
  314. public override async ValueTask DisposeAsync()
  315. {
  316. if (_resultEnumerator != null)
  317. {
  318. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  319. _resultEnumerator = null;
  320. }
  321. if (_sourceEnumerator != null)
  322. {
  323. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  324. _sourceEnumerator = null;
  325. }
  326. await base.DisposeAsync().ConfigureAwait(false);
  327. }
  328. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  329. {
  330. if (onlyIfCheap)
  331. {
  332. return new ValueTask<int>(-1);
  333. }
  334. return Core(cancellationToken);
  335. async ValueTask<int> Core(CancellationToken _cancellationToken)
  336. {
  337. var count = 0;
  338. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(_source, _cancellationToken).ConfigureAwait(false))
  339. {
  340. checked
  341. {
  342. count += await _selector(element).CountAsync().ConfigureAwait(false);
  343. }
  344. }
  345. return count;
  346. }
  347. }
  348. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  349. {
  350. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  351. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  352. return list.ToArray();
  353. }
  354. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  355. {
  356. var list = new List<TResult>();
  357. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(_source, cancellationToken).ConfigureAwait(false))
  358. {
  359. var items = _selector(element);
  360. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  361. }
  362. return list;
  363. }
  364. protected override async ValueTask<bool> MoveNextCore()
  365. {
  366. switch (_state)
  367. {
  368. case AsyncIteratorState.Allocated:
  369. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  370. _mode = State_Source;
  371. _state = AsyncIteratorState.Iterating;
  372. goto case AsyncIteratorState.Iterating;
  373. case AsyncIteratorState.Iterating:
  374. switch (_mode)
  375. {
  376. case State_Source:
  377. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  378. {
  379. if (_resultEnumerator != null)
  380. {
  381. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  382. }
  383. var inner = _selector(_sourceEnumerator.Current);
  384. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  385. _mode = State_Result;
  386. goto case State_Result;
  387. }
  388. break;
  389. case State_Result:
  390. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  391. {
  392. _current = _resultEnumerator.Current;
  393. return true;
  394. }
  395. _mode = State_Source;
  396. goto case State_Source; // loop
  397. }
  398. break;
  399. }
  400. await DisposeAsync().ConfigureAwait(false);
  401. return false;
  402. }
  403. }
  404. private sealed class SelectManyAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  405. {
  406. private const int State_Source = 1;
  407. private const int State_Result = 2;
  408. private readonly Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  409. private readonly IAsyncEnumerable<TSource> _source;
  410. private int _mode;
  411. private IAsyncEnumerator<TResult> _resultEnumerator;
  412. private IAsyncEnumerator<TSource> _sourceEnumerator;
  413. public SelectManyAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  414. {
  415. Debug.Assert(source != null);
  416. Debug.Assert(selector != null);
  417. _source = source;
  418. _selector = selector;
  419. }
  420. public override AsyncIteratorBase<TResult> Clone()
  421. {
  422. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  423. }
  424. public override async ValueTask DisposeAsync()
  425. {
  426. if (_resultEnumerator != null)
  427. {
  428. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  429. _resultEnumerator = null;
  430. }
  431. if (_sourceEnumerator != null)
  432. {
  433. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  434. _sourceEnumerator = null;
  435. }
  436. await base.DisposeAsync().ConfigureAwait(false);
  437. }
  438. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  439. {
  440. if (onlyIfCheap)
  441. {
  442. return new ValueTask<int>(-1);
  443. }
  444. return Core(cancellationToken);
  445. async ValueTask<int> Core(CancellationToken _cancellationToken)
  446. {
  447. var count = 0;
  448. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(_source, _cancellationToken).ConfigureAwait(false))
  449. {
  450. var items = await _selector(element).ConfigureAwait(false);
  451. checked
  452. {
  453. count += await items.CountAsync().ConfigureAwait(false);
  454. }
  455. }
  456. return count;
  457. }
  458. }
  459. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  460. {
  461. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  462. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  463. return list.ToArray();
  464. }
  465. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  466. {
  467. var list = new List<TResult>();
  468. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(_source, cancellationToken).ConfigureAwait(false))
  469. {
  470. var items = await _selector(element).ConfigureAwait(false);
  471. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  472. }
  473. return list;
  474. }
  475. protected override async ValueTask<bool> MoveNextCore()
  476. {
  477. switch (_state)
  478. {
  479. case AsyncIteratorState.Allocated:
  480. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  481. _mode = State_Source;
  482. _state = AsyncIteratorState.Iterating;
  483. goto case AsyncIteratorState.Iterating;
  484. case AsyncIteratorState.Iterating:
  485. switch (_mode)
  486. {
  487. case State_Source:
  488. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  489. {
  490. if (_resultEnumerator != null)
  491. {
  492. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  493. }
  494. var inner = await _selector(_sourceEnumerator.Current).ConfigureAwait(false);
  495. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  496. _mode = State_Result;
  497. goto case State_Result;
  498. }
  499. break;
  500. case State_Result:
  501. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  502. {
  503. _current = _resultEnumerator.Current;
  504. return true;
  505. }
  506. _mode = State_Source;
  507. goto case State_Source; // loop
  508. }
  509. break;
  510. }
  511. await DisposeAsync().ConfigureAwait(false);
  512. return false;
  513. }
  514. }
  515. #if !NO_DEEP_CANCELLATION
  516. private sealed class SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  517. {
  518. private const int State_Source = 1;
  519. private const int State_Result = 2;
  520. private readonly Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  521. private readonly IAsyncEnumerable<TSource> _source;
  522. private int _mode;
  523. private IAsyncEnumerator<TResult> _resultEnumerator;
  524. private IAsyncEnumerator<TSource> _sourceEnumerator;
  525. public SelectManyAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  526. {
  527. Debug.Assert(source != null);
  528. Debug.Assert(selector != null);
  529. _source = source;
  530. _selector = selector;
  531. }
  532. public override AsyncIteratorBase<TResult> Clone()
  533. {
  534. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  535. }
  536. public override async ValueTask DisposeAsync()
  537. {
  538. if (_resultEnumerator != null)
  539. {
  540. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  541. _resultEnumerator = null;
  542. }
  543. if (_sourceEnumerator != null)
  544. {
  545. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  546. _sourceEnumerator = null;
  547. }
  548. await base.DisposeAsync().ConfigureAwait(false);
  549. }
  550. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  551. {
  552. if (onlyIfCheap)
  553. {
  554. return new ValueTask<int>(-1);
  555. }
  556. return Core(cancellationToken);
  557. async ValueTask<int> Core(CancellationToken _cancellationToken)
  558. {
  559. var count = 0;
  560. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(_source, _cancellationToken).ConfigureAwait(false))
  561. {
  562. var items = await _selector(element, _cancellationToken).ConfigureAwait(false);
  563. checked
  564. {
  565. count += await items.CountAsync().ConfigureAwait(false);
  566. }
  567. }
  568. return count;
  569. }
  570. }
  571. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  572. {
  573. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  574. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  575. return list.ToArray();
  576. }
  577. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  578. {
  579. var list = new List<TResult>();
  580. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(_source, cancellationToken).ConfigureAwait(false))
  581. {
  582. var items = await _selector(element, cancellationToken).ConfigureAwait(false);
  583. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  584. }
  585. return list;
  586. }
  587. protected override async ValueTask<bool> MoveNextCore()
  588. {
  589. switch (_state)
  590. {
  591. case AsyncIteratorState.Allocated:
  592. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  593. _mode = State_Source;
  594. _state = AsyncIteratorState.Iterating;
  595. goto case AsyncIteratorState.Iterating;
  596. case AsyncIteratorState.Iterating:
  597. switch (_mode)
  598. {
  599. case State_Source:
  600. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  601. {
  602. if (_resultEnumerator != null)
  603. {
  604. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  605. }
  606. var inner = await _selector(_sourceEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  607. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  608. _mode = State_Result;
  609. goto case State_Result;
  610. }
  611. break;
  612. case State_Result:
  613. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  614. {
  615. _current = _resultEnumerator.Current;
  616. return true;
  617. }
  618. _mode = State_Source;
  619. goto case State_Source; // loop
  620. }
  621. break;
  622. }
  623. await DisposeAsync().ConfigureAwait(false);
  624. return false;
  625. }
  626. }
  627. #endif
  628. #if !USE_ASYNC_ITERATOR
  629. private sealed class SelectManyAsyncIterator<TSource, TCollection, TResult> : AsyncIterator<TResult>
  630. {
  631. private const int State_Source = 1;
  632. private const int State_Result = 2;
  633. private readonly Func<TSource, IAsyncEnumerable<TCollection>> _collectionSelector;
  634. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  635. private readonly IAsyncEnumerable<TSource> _source;
  636. private TSource _currentSource;
  637. private int _mode;
  638. private IAsyncEnumerator<TCollection> _resultEnumerator;
  639. private IAsyncEnumerator<TSource> _sourceEnumerator;
  640. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  641. {
  642. Debug.Assert(source != null);
  643. Debug.Assert(collectionSelector != null);
  644. Debug.Assert(resultSelector != null);
  645. _source = source;
  646. _collectionSelector = collectionSelector;
  647. _resultSelector = resultSelector;
  648. }
  649. public override AsyncIteratorBase<TResult> Clone()
  650. {
  651. return new SelectManyAsyncIterator<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  652. }
  653. public override async ValueTask DisposeAsync()
  654. {
  655. if (_resultEnumerator != null)
  656. {
  657. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  658. _resultEnumerator = null;
  659. }
  660. if (_sourceEnumerator != null)
  661. {
  662. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  663. _sourceEnumerator = null;
  664. }
  665. _currentSource = default;
  666. await base.DisposeAsync().ConfigureAwait(false);
  667. }
  668. protected override async ValueTask<bool> MoveNextCore()
  669. {
  670. switch (_state)
  671. {
  672. case AsyncIteratorState.Allocated:
  673. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  674. _mode = State_Source;
  675. _state = AsyncIteratorState.Iterating;
  676. goto case AsyncIteratorState.Iterating;
  677. case AsyncIteratorState.Iterating:
  678. switch (_mode)
  679. {
  680. case State_Source:
  681. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  682. {
  683. if (_resultEnumerator != null)
  684. {
  685. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  686. }
  687. _currentSource = _sourceEnumerator.Current;
  688. var inner = _collectionSelector(_currentSource);
  689. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  690. _mode = State_Result;
  691. goto case State_Result;
  692. }
  693. break;
  694. case State_Result:
  695. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  696. {
  697. _current = _resultSelector(_currentSource, _resultEnumerator.Current);
  698. return true;
  699. }
  700. _mode = State_Source;
  701. goto case State_Source; // loop
  702. }
  703. break;
  704. }
  705. await DisposeAsync().ConfigureAwait(false);
  706. return false;
  707. }
  708. }
  709. private sealed class SelectManyAsyncIteratorWithTask<TSource, TCollection, TResult> : AsyncIterator<TResult>
  710. {
  711. private const int State_Source = 1;
  712. private const int State_Result = 2;
  713. private readonly Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  714. private readonly Func<TSource, TCollection, ValueTask<TResult>> _resultSelector;
  715. private readonly IAsyncEnumerable<TSource> _source;
  716. private TSource _currentSource;
  717. private int _mode;
  718. private IAsyncEnumerator<TCollection> _resultEnumerator;
  719. private IAsyncEnumerator<TSource> _sourceEnumerator;
  720. public SelectManyAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  721. {
  722. Debug.Assert(source != null);
  723. Debug.Assert(collectionSelector != null);
  724. Debug.Assert(resultSelector != null);
  725. _source = source;
  726. _collectionSelector = collectionSelector;
  727. _resultSelector = resultSelector;
  728. }
  729. public override AsyncIteratorBase<TResult> Clone()
  730. {
  731. return new SelectManyAsyncIteratorWithTask<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  732. }
  733. public override async ValueTask DisposeAsync()
  734. {
  735. if (_resultEnumerator != null)
  736. {
  737. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  738. _resultEnumerator = null;
  739. }
  740. if (_sourceEnumerator != null)
  741. {
  742. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  743. _sourceEnumerator = null;
  744. }
  745. _currentSource = default;
  746. await base.DisposeAsync().ConfigureAwait(false);
  747. }
  748. protected override async ValueTask<bool> MoveNextCore()
  749. {
  750. switch (_state)
  751. {
  752. case AsyncIteratorState.Allocated:
  753. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  754. _mode = State_Source;
  755. _state = AsyncIteratorState.Iterating;
  756. goto case AsyncIteratorState.Iterating;
  757. case AsyncIteratorState.Iterating:
  758. switch (_mode)
  759. {
  760. case State_Source:
  761. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  762. {
  763. if (_resultEnumerator != null)
  764. {
  765. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  766. }
  767. _currentSource = _sourceEnumerator.Current;
  768. var inner = await _collectionSelector(_currentSource).ConfigureAwait(false);
  769. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  770. _mode = State_Result;
  771. goto case State_Result;
  772. }
  773. break;
  774. case State_Result:
  775. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  776. {
  777. _current = await _resultSelector(_currentSource, _resultEnumerator.Current).ConfigureAwait(false);
  778. return true;
  779. }
  780. _mode = State_Source;
  781. goto case State_Source; // loop
  782. }
  783. break;
  784. }
  785. await DisposeAsync().ConfigureAwait(false);
  786. return false;
  787. }
  788. }
  789. #if !NO_DEEP_CANCELLATION
  790. private sealed class SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult> : AsyncIterator<TResult>
  791. {
  792. private const int State_Source = 1;
  793. private const int State_Result = 2;
  794. private readonly Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  795. private readonly Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> _resultSelector;
  796. private readonly IAsyncEnumerable<TSource> _source;
  797. private TSource _currentSource;
  798. private int _mode;
  799. private IAsyncEnumerator<TCollection> _resultEnumerator;
  800. private IAsyncEnumerator<TSource> _sourceEnumerator;
  801. public SelectManyAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  802. {
  803. Debug.Assert(source != null);
  804. Debug.Assert(collectionSelector != null);
  805. Debug.Assert(resultSelector != null);
  806. _source = source;
  807. _collectionSelector = collectionSelector;
  808. _resultSelector = resultSelector;
  809. }
  810. public override AsyncIteratorBase<TResult> Clone()
  811. {
  812. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  813. }
  814. public override async ValueTask DisposeAsync()
  815. {
  816. if (_resultEnumerator != null)
  817. {
  818. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  819. _resultEnumerator = null;
  820. }
  821. if (_sourceEnumerator != null)
  822. {
  823. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  824. _sourceEnumerator = null;
  825. }
  826. _currentSource = default;
  827. await base.DisposeAsync().ConfigureAwait(false);
  828. }
  829. protected override async ValueTask<bool> MoveNextCore()
  830. {
  831. switch (_state)
  832. {
  833. case AsyncIteratorState.Allocated:
  834. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  835. _mode = State_Source;
  836. _state = AsyncIteratorState.Iterating;
  837. goto case AsyncIteratorState.Iterating;
  838. case AsyncIteratorState.Iterating:
  839. switch (_mode)
  840. {
  841. case State_Source:
  842. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  843. {
  844. if (_resultEnumerator != null)
  845. {
  846. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  847. }
  848. _currentSource = _sourceEnumerator.Current;
  849. var inner = await _collectionSelector(_currentSource, _cancellationToken).ConfigureAwait(false);
  850. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  851. _mode = State_Result;
  852. goto case State_Result;
  853. }
  854. break;
  855. case State_Result:
  856. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  857. {
  858. _current = await _resultSelector(_currentSource, _resultEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  859. return true;
  860. }
  861. _mode = State_Source;
  862. goto case State_Source; // loop
  863. }
  864. break;
  865. }
  866. await DisposeAsync().ConfigureAwait(false);
  867. return false;
  868. }
  869. }
  870. #endif
  871. private sealed class SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult> : AsyncIterator<TResult>
  872. {
  873. private const int State_Source = 1;
  874. private const int State_Result = 2;
  875. private readonly Func<TSource, int, IAsyncEnumerable<TCollection>> _collectionSelector;
  876. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  877. private readonly IAsyncEnumerable<TSource> _source;
  878. private TSource _currentSource;
  879. private int _index;
  880. private int _mode;
  881. private IAsyncEnumerator<TCollection> _resultEnumerator;
  882. private IAsyncEnumerator<TSource> _sourceEnumerator;
  883. public SelectManyWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  884. {
  885. Debug.Assert(source != null);
  886. Debug.Assert(collectionSelector != null);
  887. Debug.Assert(resultSelector != null);
  888. _source = source;
  889. _collectionSelector = collectionSelector;
  890. _resultSelector = resultSelector;
  891. }
  892. public override AsyncIteratorBase<TResult> Clone()
  893. {
  894. return new SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  895. }
  896. public override async ValueTask DisposeAsync()
  897. {
  898. if (_resultEnumerator != null)
  899. {
  900. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  901. _resultEnumerator = null;
  902. }
  903. if (_sourceEnumerator != null)
  904. {
  905. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  906. _sourceEnumerator = null;
  907. }
  908. _currentSource = default;
  909. await base.DisposeAsync().ConfigureAwait(false);
  910. }
  911. protected override async ValueTask<bool> MoveNextCore()
  912. {
  913. switch (_state)
  914. {
  915. case AsyncIteratorState.Allocated:
  916. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  917. _index = -1;
  918. _mode = State_Source;
  919. _state = AsyncIteratorState.Iterating;
  920. goto case AsyncIteratorState.Iterating;
  921. case AsyncIteratorState.Iterating:
  922. switch (_mode)
  923. {
  924. case State_Source:
  925. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  926. {
  927. if (_resultEnumerator != null)
  928. {
  929. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  930. }
  931. _currentSource = _sourceEnumerator.Current;
  932. checked
  933. {
  934. _index++;
  935. }
  936. var inner = _collectionSelector(_currentSource, _index);
  937. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  938. _mode = State_Result;
  939. goto case State_Result;
  940. }
  941. break;
  942. case State_Result:
  943. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  944. {
  945. _current = _resultSelector(_currentSource, _resultEnumerator.Current);
  946. return true;
  947. }
  948. _mode = State_Source;
  949. goto case State_Source; // loop
  950. }
  951. break;
  952. }
  953. await DisposeAsync().ConfigureAwait(false);
  954. return false;
  955. }
  956. }
  957. private sealed class SelectManyWithIndexAsyncIteratorWithTask<TSource, TCollection, TResult> : AsyncIterator<TResult>
  958. {
  959. private const int State_Source = 1;
  960. private const int State_Result = 2;
  961. private readonly Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  962. private readonly Func<TSource, TCollection, ValueTask<TResult>> _resultSelector;
  963. private readonly IAsyncEnumerable<TSource> _source;
  964. private TSource _currentSource;
  965. private int _index;
  966. private int _mode;
  967. private IAsyncEnumerator<TCollection> _resultEnumerator;
  968. private IAsyncEnumerator<TSource> _sourceEnumerator;
  969. public SelectManyWithIndexAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  970. {
  971. Debug.Assert(source != null);
  972. Debug.Assert(collectionSelector != null);
  973. Debug.Assert(resultSelector != null);
  974. _source = source;
  975. _collectionSelector = collectionSelector;
  976. _resultSelector = resultSelector;
  977. }
  978. public override AsyncIteratorBase<TResult> Clone()
  979. {
  980. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  981. }
  982. public override async ValueTask DisposeAsync()
  983. {
  984. if (_resultEnumerator != null)
  985. {
  986. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  987. _resultEnumerator = null;
  988. }
  989. if (_sourceEnumerator != null)
  990. {
  991. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  992. _sourceEnumerator = null;
  993. }
  994. _currentSource = default;
  995. await base.DisposeAsync().ConfigureAwait(false);
  996. }
  997. protected override async ValueTask<bool> MoveNextCore()
  998. {
  999. switch (_state)
  1000. {
  1001. case AsyncIteratorState.Allocated:
  1002. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1003. _index = -1;
  1004. _mode = State_Source;
  1005. _state = AsyncIteratorState.Iterating;
  1006. goto case AsyncIteratorState.Iterating;
  1007. case AsyncIteratorState.Iterating:
  1008. switch (_mode)
  1009. {
  1010. case State_Source:
  1011. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1012. {
  1013. if (_resultEnumerator != null)
  1014. {
  1015. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1016. }
  1017. _currentSource = _sourceEnumerator.Current;
  1018. checked
  1019. {
  1020. _index++;
  1021. }
  1022. var inner = await _collectionSelector(_currentSource, _index).ConfigureAwait(false);
  1023. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1024. _mode = State_Result;
  1025. goto case State_Result;
  1026. }
  1027. break;
  1028. case State_Result:
  1029. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1030. {
  1031. _current = await _resultSelector(_currentSource, _resultEnumerator.Current).ConfigureAwait(false);
  1032. return true;
  1033. }
  1034. _mode = State_Source;
  1035. goto case State_Source; // loop
  1036. }
  1037. break;
  1038. }
  1039. await DisposeAsync().ConfigureAwait(false);
  1040. return false;
  1041. }
  1042. }
  1043. #if !NO_DEEP_CANCELLATION
  1044. private sealed class SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult> : AsyncIterator<TResult>
  1045. {
  1046. private const int State_Source = 1;
  1047. private const int State_Result = 2;
  1048. private readonly Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  1049. private readonly Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> _resultSelector;
  1050. private readonly IAsyncEnumerable<TSource> _source;
  1051. private TSource _currentSource;
  1052. private int _index;
  1053. private int _mode;
  1054. private IAsyncEnumerator<TCollection> _resultEnumerator;
  1055. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1056. public SelectManyWithIndexAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  1057. {
  1058. Debug.Assert(source != null);
  1059. Debug.Assert(collectionSelector != null);
  1060. Debug.Assert(resultSelector != null);
  1061. _source = source;
  1062. _collectionSelector = collectionSelector;
  1063. _resultSelector = resultSelector;
  1064. }
  1065. public override AsyncIteratorBase<TResult> Clone()
  1066. {
  1067. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  1068. }
  1069. public override async ValueTask DisposeAsync()
  1070. {
  1071. if (_resultEnumerator != null)
  1072. {
  1073. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1074. _resultEnumerator = null;
  1075. }
  1076. if (_sourceEnumerator != null)
  1077. {
  1078. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1079. _sourceEnumerator = null;
  1080. }
  1081. _currentSource = default;
  1082. await base.DisposeAsync().ConfigureAwait(false);
  1083. }
  1084. protected override async ValueTask<bool> MoveNextCore()
  1085. {
  1086. switch (_state)
  1087. {
  1088. case AsyncIteratorState.Allocated:
  1089. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1090. _index = -1;
  1091. _mode = State_Source;
  1092. _state = AsyncIteratorState.Iterating;
  1093. goto case AsyncIteratorState.Iterating;
  1094. case AsyncIteratorState.Iterating:
  1095. switch (_mode)
  1096. {
  1097. case State_Source:
  1098. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1099. {
  1100. if (_resultEnumerator != null)
  1101. {
  1102. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1103. }
  1104. _currentSource = _sourceEnumerator.Current;
  1105. checked
  1106. {
  1107. _index++;
  1108. }
  1109. var inner = await _collectionSelector(_currentSource, _index, _cancellationToken).ConfigureAwait(false);
  1110. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1111. _mode = State_Result;
  1112. goto case State_Result;
  1113. }
  1114. break;
  1115. case State_Result:
  1116. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1117. {
  1118. _current = await _resultSelector(_currentSource, _resultEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  1119. return true;
  1120. }
  1121. _mode = State_Source;
  1122. goto case State_Source; // loop
  1123. }
  1124. break;
  1125. }
  1126. await DisposeAsync().ConfigureAwait(false);
  1127. return false;
  1128. }
  1129. }
  1130. #endif
  1131. private sealed class SelectManyWithIndexAsyncIterator<TSource, TResult> : AsyncIterator<TResult>
  1132. {
  1133. private const int State_Source = 1;
  1134. private const int State_Result = 2;
  1135. private readonly Func<TSource, int, IAsyncEnumerable<TResult>> _selector;
  1136. private readonly IAsyncEnumerable<TSource> _source;
  1137. private int _index;
  1138. private int _mode;
  1139. private IAsyncEnumerator<TResult> _resultEnumerator;
  1140. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1141. public SelectManyWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  1142. {
  1143. Debug.Assert(source != null);
  1144. Debug.Assert(selector != null);
  1145. _source = source;
  1146. _selector = selector;
  1147. }
  1148. public override AsyncIteratorBase<TResult> Clone()
  1149. {
  1150. return new SelectManyWithIndexAsyncIterator<TSource, TResult>(_source, _selector);
  1151. }
  1152. public override async ValueTask DisposeAsync()
  1153. {
  1154. if (_resultEnumerator != null)
  1155. {
  1156. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1157. _resultEnumerator = null;
  1158. }
  1159. if (_sourceEnumerator != null)
  1160. {
  1161. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1162. _sourceEnumerator = null;
  1163. }
  1164. await base.DisposeAsync().ConfigureAwait(false);
  1165. }
  1166. protected override async ValueTask<bool> MoveNextCore()
  1167. {
  1168. switch (_state)
  1169. {
  1170. case AsyncIteratorState.Allocated:
  1171. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1172. _index = -1;
  1173. _mode = State_Source;
  1174. _state = AsyncIteratorState.Iterating;
  1175. goto case AsyncIteratorState.Iterating;
  1176. case AsyncIteratorState.Iterating:
  1177. switch (_mode)
  1178. {
  1179. case State_Source:
  1180. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1181. {
  1182. if (_resultEnumerator != null)
  1183. {
  1184. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1185. }
  1186. checked
  1187. {
  1188. _index++;
  1189. }
  1190. var inner = _selector(_sourceEnumerator.Current, _index);
  1191. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1192. _mode = State_Result;
  1193. goto case State_Result;
  1194. }
  1195. break;
  1196. case State_Result:
  1197. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1198. {
  1199. _current = _resultEnumerator.Current;
  1200. return true;
  1201. }
  1202. _mode = State_Source;
  1203. goto case State_Source; // loop
  1204. }
  1205. break;
  1206. }
  1207. await DisposeAsync().ConfigureAwait(false);
  1208. return false;
  1209. }
  1210. }
  1211. private sealed class SelectManyWithIndexAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>
  1212. {
  1213. private const int State_Source = 1;
  1214. private const int State_Result = 2;
  1215. private readonly Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  1216. private readonly IAsyncEnumerable<TSource> _source;
  1217. private int _index;
  1218. private int _mode;
  1219. private IAsyncEnumerator<TResult> _resultEnumerator;
  1220. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1221. public SelectManyWithIndexAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> selector)
  1222. {
  1223. Debug.Assert(source != null);
  1224. Debug.Assert(selector != null);
  1225. _source = source;
  1226. _selector = selector;
  1227. }
  1228. public override AsyncIteratorBase<TResult> Clone()
  1229. {
  1230. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  1231. }
  1232. public override async ValueTask DisposeAsync()
  1233. {
  1234. if (_resultEnumerator != null)
  1235. {
  1236. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1237. _resultEnumerator = null;
  1238. }
  1239. if (_sourceEnumerator != null)
  1240. {
  1241. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1242. _sourceEnumerator = null;
  1243. }
  1244. await base.DisposeAsync().ConfigureAwait(false);
  1245. }
  1246. protected override async ValueTask<bool> MoveNextCore()
  1247. {
  1248. switch (_state)
  1249. {
  1250. case AsyncIteratorState.Allocated:
  1251. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1252. _index = -1;
  1253. _mode = State_Source;
  1254. _state = AsyncIteratorState.Iterating;
  1255. goto case AsyncIteratorState.Iterating;
  1256. case AsyncIteratorState.Iterating:
  1257. switch (_mode)
  1258. {
  1259. case State_Source:
  1260. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1261. {
  1262. if (_resultEnumerator != null)
  1263. {
  1264. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1265. }
  1266. checked
  1267. {
  1268. _index++;
  1269. }
  1270. var inner = await _selector(_sourceEnumerator.Current, _index).ConfigureAwait(false);
  1271. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1272. _mode = State_Result;
  1273. goto case State_Result;
  1274. }
  1275. break;
  1276. case State_Result:
  1277. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1278. {
  1279. _current = _resultEnumerator.Current;
  1280. return true;
  1281. }
  1282. _mode = State_Source;
  1283. goto case State_Source; // loop
  1284. }
  1285. break;
  1286. }
  1287. await DisposeAsync().ConfigureAwait(false);
  1288. return false;
  1289. }
  1290. }
  1291. #if !NO_DEEP_CANCELLATION
  1292. private sealed class SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>
  1293. {
  1294. private const int State_Source = 1;
  1295. private const int State_Result = 2;
  1296. private readonly Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  1297. private readonly IAsyncEnumerable<TSource> _source;
  1298. private int _index;
  1299. private int _mode;
  1300. private IAsyncEnumerator<TResult> _resultEnumerator;
  1301. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1302. public SelectManyWithIndexAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  1303. {
  1304. Debug.Assert(source != null);
  1305. Debug.Assert(selector != null);
  1306. _source = source;
  1307. _selector = selector;
  1308. }
  1309. public override AsyncIteratorBase<TResult> Clone()
  1310. {
  1311. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  1312. }
  1313. public override async ValueTask DisposeAsync()
  1314. {
  1315. if (_resultEnumerator != null)
  1316. {
  1317. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1318. _resultEnumerator = null;
  1319. }
  1320. if (_sourceEnumerator != null)
  1321. {
  1322. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1323. _sourceEnumerator = null;
  1324. }
  1325. await base.DisposeAsync().ConfigureAwait(false);
  1326. }
  1327. protected override async ValueTask<bool> MoveNextCore()
  1328. {
  1329. switch (_state)
  1330. {
  1331. case AsyncIteratorState.Allocated:
  1332. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1333. _index = -1;
  1334. _mode = State_Source;
  1335. _state = AsyncIteratorState.Iterating;
  1336. goto case AsyncIteratorState.Iterating;
  1337. case AsyncIteratorState.Iterating:
  1338. switch (_mode)
  1339. {
  1340. case State_Source:
  1341. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1342. {
  1343. if (_resultEnumerator != null)
  1344. {
  1345. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1346. }
  1347. checked
  1348. {
  1349. _index++;
  1350. }
  1351. var inner = await _selector(_sourceEnumerator.Current, _index, _cancellationToken).ConfigureAwait(false);
  1352. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1353. _mode = State_Result;
  1354. goto case State_Result;
  1355. }
  1356. break;
  1357. case State_Result:
  1358. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1359. {
  1360. _current = _resultEnumerator.Current;
  1361. return true;
  1362. }
  1363. _mode = State_Source;
  1364. goto case State_Source; // loop
  1365. }
  1366. break;
  1367. }
  1368. await DisposeAsync().ConfigureAwait(false);
  1369. return false;
  1370. }
  1371. }
  1372. #endif
  1373. #endif
  1374. }
  1375. }