SelectMany.cs 72 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  13. {
  14. if (source == null)
  15. throw Error.ArgumentNull(nameof(source));
  16. if (selector == null)
  17. throw Error.ArgumentNull(nameof(selector));
  18. return new SelectManyAsyncIterator<TSource, TResult>(source, selector);
  19. }
  20. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  21. {
  22. if (source == null)
  23. throw Error.ArgumentNull(nameof(source));
  24. if (selector == null)
  25. throw Error.ArgumentNull(nameof(selector));
  26. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(source, selector);
  27. }
  28. #if !NO_DEEP_CANCELLATION
  29. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  30. {
  31. if (source == null)
  32. throw Error.ArgumentNull(nameof(source));
  33. if (selector == null)
  34. throw Error.ArgumentNull(nameof(selector));
  35. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector);
  36. }
  37. #endif
  38. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  39. {
  40. if (source == null)
  41. throw Error.ArgumentNull(nameof(source));
  42. if (selector == null)
  43. throw Error.ArgumentNull(nameof(selector));
  44. #if USE_ASYNC_ITERATOR
  45. return Create(Core);
  46. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  47. {
  48. int index = -1;
  49. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  50. {
  51. checked
  52. {
  53. index++;
  54. }
  55. var inner = selector(element, index);
  56. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  57. {
  58. yield return subElement;
  59. }
  60. }
  61. }
  62. #else
  63. return new SelectManyWithIndexAsyncIterator<TSource, TResult>(source, selector);
  64. #endif
  65. }
  66. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> selector)
  67. {
  68. if (source == null)
  69. throw Error.ArgumentNull(nameof(source));
  70. if (selector == null)
  71. throw Error.ArgumentNull(nameof(selector));
  72. #if USE_ASYNC_ITERATOR
  73. return Create(Core);
  74. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  75. {
  76. int index = -1;
  77. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  78. {
  79. checked
  80. {
  81. index++;
  82. }
  83. var inner = await selector(element, index).ConfigureAwait(false);
  84. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  85. {
  86. yield return subElement;
  87. }
  88. }
  89. }
  90. #else
  91. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TResult>(source, selector);
  92. #endif
  93. }
  94. #if !NO_DEEP_CANCELLATION
  95. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  96. {
  97. if (source == null)
  98. throw Error.ArgumentNull(nameof(source));
  99. if (selector == null)
  100. throw Error.ArgumentNull(nameof(selector));
  101. #if USE_ASYNC_ITERATOR
  102. return Create(Core);
  103. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  104. {
  105. int index = -1;
  106. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  107. {
  108. checked
  109. {
  110. index++;
  111. }
  112. var inner = await selector(element, index, cancellationToken).ConfigureAwait(false);
  113. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  114. {
  115. yield return subElement;
  116. }
  117. }
  118. }
  119. #else
  120. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector);
  121. #endif
  122. }
  123. #endif
  124. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  125. {
  126. if (source == null)
  127. throw Error.ArgumentNull(nameof(source));
  128. if (collectionSelector == null)
  129. throw Error.ArgumentNull(nameof(collectionSelector));
  130. if (resultSelector == null)
  131. throw Error.ArgumentNull(nameof(resultSelector));
  132. #if USE_ASYNC_ITERATOR
  133. return Create(Core);
  134. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  135. {
  136. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  137. {
  138. var inner = collectionSelector(element);
  139. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  140. {
  141. yield return resultSelector(element, subElement);
  142. }
  143. }
  144. }
  145. #else
  146. return new SelectManyAsyncIterator<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  147. #endif
  148. }
  149. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  150. {
  151. if (source == null)
  152. throw Error.ArgumentNull(nameof(source));
  153. if (collectionSelector == null)
  154. throw Error.ArgumentNull(nameof(collectionSelector));
  155. if (resultSelector == null)
  156. throw Error.ArgumentNull(nameof(resultSelector));
  157. #if USE_ASYNC_ITERATOR
  158. return Create(Core);
  159. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  160. {
  161. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  162. {
  163. var inner = await collectionSelector(element).ConfigureAwait(false);
  164. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  165. {
  166. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  167. }
  168. }
  169. }
  170. #else
  171. return new SelectManyAsyncIteratorWithTask<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  172. #endif
  173. }
  174. #if !NO_DEEP_CANCELLATION
  175. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  176. {
  177. if (source == null)
  178. throw Error.ArgumentNull(nameof(source));
  179. if (collectionSelector == null)
  180. throw Error.ArgumentNull(nameof(collectionSelector));
  181. if (resultSelector == null)
  182. throw Error.ArgumentNull(nameof(resultSelector));
  183. #if USE_ASYNC_ITERATOR
  184. return Create(Core);
  185. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  186. {
  187. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  188. {
  189. var inner = await collectionSelector(element, cancellationToken).ConfigureAwait(false);
  190. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  191. {
  192. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  193. }
  194. }
  195. }
  196. #else
  197. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  198. #endif
  199. }
  200. #endif
  201. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  202. {
  203. if (source == null)
  204. throw Error.ArgumentNull(nameof(source));
  205. if (collectionSelector == null)
  206. throw Error.ArgumentNull(nameof(collectionSelector));
  207. if (resultSelector == null)
  208. throw Error.ArgumentNull(nameof(resultSelector));
  209. #if USE_ASYNC_ITERATOR
  210. return Create(Core);
  211. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  212. {
  213. int index = -1;
  214. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  215. {
  216. checked
  217. {
  218. index++;
  219. }
  220. var inner = collectionSelector(element, index);
  221. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  222. {
  223. yield return resultSelector(element, subElement);
  224. }
  225. }
  226. }
  227. #else
  228. return new SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  229. #endif
  230. }
  231. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  232. {
  233. if (source == null)
  234. throw Error.ArgumentNull(nameof(source));
  235. if (collectionSelector == null)
  236. throw Error.ArgumentNull(nameof(collectionSelector));
  237. if (resultSelector == null)
  238. throw Error.ArgumentNull(nameof(resultSelector));
  239. #if USE_ASYNC_ITERATOR
  240. return Create(Core);
  241. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  242. {
  243. int index = -1;
  244. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  245. {
  246. checked
  247. {
  248. index++;
  249. }
  250. var inner = await collectionSelector(element, index).ConfigureAwait(false);
  251. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  252. {
  253. yield return await resultSelector(element, subElement).ConfigureAwait(false);
  254. }
  255. }
  256. }
  257. #else
  258. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  259. #endif
  260. }
  261. #if !NO_DEEP_CANCELLATION
  262. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  263. {
  264. if (source == null)
  265. throw Error.ArgumentNull(nameof(source));
  266. if (collectionSelector == null)
  267. throw Error.ArgumentNull(nameof(collectionSelector));
  268. if (resultSelector == null)
  269. throw Error.ArgumentNull(nameof(resultSelector));
  270. #if USE_ASYNC_ITERATOR
  271. return Create(Core);
  272. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  273. {
  274. int index = -1;
  275. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  276. {
  277. checked
  278. {
  279. index++;
  280. }
  281. var inner = await collectionSelector(element, index, cancellationToken).ConfigureAwait(false);
  282. await foreach (var subElement in inner.WithCancellation(cancellationToken).ConfigureAwait(false))
  283. {
  284. yield return await resultSelector(element, subElement, cancellationToken).ConfigureAwait(false);
  285. }
  286. }
  287. }
  288. #else
  289. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  290. #endif
  291. }
  292. #endif
  293. private sealed class SelectManyAsyncIterator<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  294. {
  295. private const int State_Source = 1;
  296. private const int State_Result = 2;
  297. private readonly Func<TSource, IAsyncEnumerable<TResult>> _selector;
  298. private readonly IAsyncEnumerable<TSource> _source;
  299. private int _mode;
  300. private IAsyncEnumerator<TResult> _resultEnumerator;
  301. private IAsyncEnumerator<TSource> _sourceEnumerator;
  302. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  303. {
  304. Debug.Assert(source != null);
  305. Debug.Assert(selector != null);
  306. _source = source;
  307. _selector = selector;
  308. }
  309. public override AsyncIteratorBase<TResult> Clone()
  310. {
  311. return new SelectManyAsyncIterator<TSource, TResult>(_source, _selector);
  312. }
  313. public override async ValueTask DisposeAsync()
  314. {
  315. if (_resultEnumerator != null)
  316. {
  317. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  318. _resultEnumerator = null;
  319. }
  320. if (_sourceEnumerator != null)
  321. {
  322. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  323. _sourceEnumerator = null;
  324. }
  325. await base.DisposeAsync().ConfigureAwait(false);
  326. }
  327. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  328. {
  329. if (onlyIfCheap)
  330. {
  331. return new ValueTask<int>(-1);
  332. }
  333. return Core(cancellationToken);
  334. async ValueTask<int> Core(CancellationToken _cancellationToken)
  335. {
  336. var count = 0;
  337. #if USE_AWAIT_FOREACH
  338. await foreach (var element in _source.WithCancellation(_cancellationToken).ConfigureAwait(false))
  339. {
  340. checked
  341. {
  342. count += await _selector(element).CountAsync().ConfigureAwait(false);
  343. }
  344. }
  345. #else
  346. var e = _source.GetAsyncEnumerator(_cancellationToken);
  347. try
  348. {
  349. while (await e.MoveNextAsync().ConfigureAwait(false))
  350. {
  351. checked
  352. {
  353. count += await _selector(e.Current).CountAsync().ConfigureAwait(false);
  354. }
  355. }
  356. }
  357. finally
  358. {
  359. await e.DisposeAsync().ConfigureAwait(false);
  360. }
  361. #endif
  362. return count;
  363. }
  364. }
  365. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  366. {
  367. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  368. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  369. return list.ToArray();
  370. }
  371. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  372. {
  373. var list = new List<TResult>();
  374. #if USE_AWAIT_FOREACH
  375. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  376. {
  377. var items = _selector(element);
  378. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  379. }
  380. #else
  381. var e = _source.GetAsyncEnumerator(cancellationToken);
  382. try
  383. {
  384. while (await e.MoveNextAsync().ConfigureAwait(false))
  385. {
  386. var items = _selector(e.Current);
  387. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  388. }
  389. }
  390. finally
  391. {
  392. await e.DisposeAsync().ConfigureAwait(false);
  393. }
  394. #endif
  395. return list;
  396. }
  397. protected override async ValueTask<bool> MoveNextCore()
  398. {
  399. switch (_state)
  400. {
  401. case AsyncIteratorState.Allocated:
  402. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  403. _mode = State_Source;
  404. _state = AsyncIteratorState.Iterating;
  405. goto case AsyncIteratorState.Iterating;
  406. case AsyncIteratorState.Iterating:
  407. switch (_mode)
  408. {
  409. case State_Source:
  410. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  411. {
  412. if (_resultEnumerator != null)
  413. {
  414. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  415. }
  416. var inner = _selector(_sourceEnumerator.Current);
  417. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  418. _mode = State_Result;
  419. goto case State_Result;
  420. }
  421. break;
  422. case State_Result:
  423. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  424. {
  425. _current = _resultEnumerator.Current;
  426. return true;
  427. }
  428. _mode = State_Source;
  429. goto case State_Source; // loop
  430. }
  431. break;
  432. }
  433. await DisposeAsync().ConfigureAwait(false);
  434. return false;
  435. }
  436. }
  437. private sealed class SelectManyAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  438. {
  439. private const int State_Source = 1;
  440. private const int State_Result = 2;
  441. private readonly Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  442. private readonly IAsyncEnumerable<TSource> _source;
  443. private int _mode;
  444. private IAsyncEnumerator<TResult> _resultEnumerator;
  445. private IAsyncEnumerator<TSource> _sourceEnumerator;
  446. public SelectManyAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TResult>>> selector)
  447. {
  448. Debug.Assert(source != null);
  449. Debug.Assert(selector != null);
  450. _source = source;
  451. _selector = selector;
  452. }
  453. public override AsyncIteratorBase<TResult> Clone()
  454. {
  455. return new SelectManyAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  456. }
  457. public override async ValueTask DisposeAsync()
  458. {
  459. if (_resultEnumerator != null)
  460. {
  461. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  462. _resultEnumerator = null;
  463. }
  464. if (_sourceEnumerator != null)
  465. {
  466. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  467. _sourceEnumerator = null;
  468. }
  469. await base.DisposeAsync().ConfigureAwait(false);
  470. }
  471. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  472. {
  473. if (onlyIfCheap)
  474. {
  475. return new ValueTask<int>(-1);
  476. }
  477. return Core(cancellationToken);
  478. async ValueTask<int> Core(CancellationToken _cancellationToken)
  479. {
  480. var count = 0;
  481. #if USE_AWAIT_FOREACH
  482. await foreach (var element in _source.WithCancellation(_cancellationToken).ConfigureAwait(false))
  483. {
  484. var items = await _selector(element).ConfigureAwait(false);
  485. checked
  486. {
  487. count += await items.CountAsync().ConfigureAwait(false);
  488. }
  489. }
  490. #else
  491. var e = _source.GetAsyncEnumerator(_cancellationToken);
  492. try
  493. {
  494. while (await e.MoveNextAsync().ConfigureAwait(false))
  495. {
  496. var items = await _selector(e.Current).ConfigureAwait(false);
  497. checked
  498. {
  499. count += await items.CountAsync().ConfigureAwait(false);
  500. }
  501. }
  502. }
  503. finally
  504. {
  505. await e.DisposeAsync().ConfigureAwait(false);
  506. }
  507. #endif
  508. return count;
  509. }
  510. }
  511. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  512. {
  513. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  514. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  515. return list.ToArray();
  516. }
  517. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  518. {
  519. var list = new List<TResult>();
  520. #if USE_AWAIT_FOREACH
  521. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  522. {
  523. var items = await _selector(element).ConfigureAwait(false);
  524. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  525. }
  526. #else
  527. var e = _source.GetAsyncEnumerator(cancellationToken);
  528. try
  529. {
  530. while (await e.MoveNextAsync().ConfigureAwait(false))
  531. {
  532. var items = await _selector(e.Current).ConfigureAwait(false);
  533. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  534. }
  535. }
  536. finally
  537. {
  538. await e.DisposeAsync().ConfigureAwait(false);
  539. }
  540. #endif
  541. return list;
  542. }
  543. protected override async ValueTask<bool> MoveNextCore()
  544. {
  545. switch (_state)
  546. {
  547. case AsyncIteratorState.Allocated:
  548. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  549. _mode = State_Source;
  550. _state = AsyncIteratorState.Iterating;
  551. goto case AsyncIteratorState.Iterating;
  552. case AsyncIteratorState.Iterating:
  553. switch (_mode)
  554. {
  555. case State_Source:
  556. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  557. {
  558. if (_resultEnumerator != null)
  559. {
  560. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  561. }
  562. var inner = await _selector(_sourceEnumerator.Current).ConfigureAwait(false);
  563. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  564. _mode = State_Result;
  565. goto case State_Result;
  566. }
  567. break;
  568. case State_Result:
  569. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  570. {
  571. _current = _resultEnumerator.Current;
  572. return true;
  573. }
  574. _mode = State_Source;
  575. goto case State_Source; // loop
  576. }
  577. break;
  578. }
  579. await DisposeAsync().ConfigureAwait(false);
  580. return false;
  581. }
  582. }
  583. #if !NO_DEEP_CANCELLATION
  584. private sealed class SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  585. {
  586. private const int State_Source = 1;
  587. private const int State_Result = 2;
  588. private readonly Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  589. private readonly IAsyncEnumerable<TSource> _source;
  590. private int _mode;
  591. private IAsyncEnumerator<TResult> _resultEnumerator;
  592. private IAsyncEnumerator<TSource> _sourceEnumerator;
  593. public SelectManyAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  594. {
  595. Debug.Assert(source != null);
  596. Debug.Assert(selector != null);
  597. _source = source;
  598. _selector = selector;
  599. }
  600. public override AsyncIteratorBase<TResult> Clone()
  601. {
  602. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  603. }
  604. public override async ValueTask DisposeAsync()
  605. {
  606. if (_resultEnumerator != null)
  607. {
  608. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  609. _resultEnumerator = null;
  610. }
  611. if (_sourceEnumerator != null)
  612. {
  613. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  614. _sourceEnumerator = null;
  615. }
  616. await base.DisposeAsync().ConfigureAwait(false);
  617. }
  618. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  619. {
  620. if (onlyIfCheap)
  621. {
  622. return new ValueTask<int>(-1);
  623. }
  624. return Core(cancellationToken);
  625. async ValueTask<int> Core(CancellationToken _cancellationToken)
  626. {
  627. var count = 0;
  628. #if USE_AWAIT_FOREACH
  629. await foreach (var element in _source.WithCancellation(_cancellationToken).ConfigureAwait(false))
  630. {
  631. var items = await _selector(element, _cancellationToken).ConfigureAwait(false);
  632. checked
  633. {
  634. count += await items.CountAsync().ConfigureAwait(false);
  635. }
  636. }
  637. #else
  638. var e = _source.GetAsyncEnumerator(_cancellationToken);
  639. try
  640. {
  641. while (await e.MoveNextAsync().ConfigureAwait(false))
  642. {
  643. var items = await _selector(e.Current, _cancellationToken).ConfigureAwait(false);
  644. checked
  645. {
  646. count += await items.CountAsync().ConfigureAwait(false);
  647. }
  648. }
  649. }
  650. finally
  651. {
  652. await e.DisposeAsync().ConfigureAwait(false);
  653. }
  654. #endif
  655. return count;
  656. }
  657. }
  658. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  659. {
  660. // REVIEW: Substitute for SparseArrayBuilder<T> logic once we have access to that.
  661. var list = await ToListAsync(cancellationToken).ConfigureAwait(false);
  662. return list.ToArray();
  663. }
  664. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  665. {
  666. var list = new List<TResult>();
  667. #if USE_AWAIT_FOREACH
  668. await foreach (var element in _source.WithCancellation(cancellationToken).ConfigureAwait(false))
  669. {
  670. var items = await _selector(element, cancellationToken).ConfigureAwait(false);
  671. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  672. }
  673. #else
  674. var e = _source.GetAsyncEnumerator(cancellationToken);
  675. try
  676. {
  677. while (await e.MoveNextAsync().ConfigureAwait(false))
  678. {
  679. var items = await _selector(e.Current, cancellationToken).ConfigureAwait(false);
  680. await list.AddRangeAsync(items, cancellationToken).ConfigureAwait(false);
  681. }
  682. }
  683. finally
  684. {
  685. await e.DisposeAsync().ConfigureAwait(false);
  686. }
  687. #endif
  688. return list;
  689. }
  690. protected override async ValueTask<bool> MoveNextCore()
  691. {
  692. switch (_state)
  693. {
  694. case AsyncIteratorState.Allocated:
  695. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  696. _mode = State_Source;
  697. _state = AsyncIteratorState.Iterating;
  698. goto case AsyncIteratorState.Iterating;
  699. case AsyncIteratorState.Iterating:
  700. switch (_mode)
  701. {
  702. case State_Source:
  703. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  704. {
  705. if (_resultEnumerator != null)
  706. {
  707. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  708. }
  709. var inner = await _selector(_sourceEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  710. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  711. _mode = State_Result;
  712. goto case State_Result;
  713. }
  714. break;
  715. case State_Result:
  716. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  717. {
  718. _current = _resultEnumerator.Current;
  719. return true;
  720. }
  721. _mode = State_Source;
  722. goto case State_Source; // loop
  723. }
  724. break;
  725. }
  726. await DisposeAsync().ConfigureAwait(false);
  727. return false;
  728. }
  729. }
  730. #endif
  731. #if !USE_ASYNC_ITERATOR
  732. private sealed class SelectManyAsyncIterator<TSource, TCollection, TResult> : AsyncIterator<TResult>
  733. {
  734. private const int State_Source = 1;
  735. private const int State_Result = 2;
  736. private readonly Func<TSource, IAsyncEnumerable<TCollection>> _collectionSelector;
  737. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  738. private readonly IAsyncEnumerable<TSource> _source;
  739. private TSource _currentSource;
  740. private int _mode;
  741. private IAsyncEnumerator<TCollection> _resultEnumerator;
  742. private IAsyncEnumerator<TSource> _sourceEnumerator;
  743. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  744. {
  745. Debug.Assert(source != null);
  746. Debug.Assert(collectionSelector != null);
  747. Debug.Assert(resultSelector != null);
  748. _source = source;
  749. _collectionSelector = collectionSelector;
  750. _resultSelector = resultSelector;
  751. }
  752. public override AsyncIteratorBase<TResult> Clone()
  753. {
  754. return new SelectManyAsyncIterator<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  755. }
  756. public override async ValueTask DisposeAsync()
  757. {
  758. if (_resultEnumerator != null)
  759. {
  760. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  761. _resultEnumerator = null;
  762. }
  763. if (_sourceEnumerator != null)
  764. {
  765. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  766. _sourceEnumerator = null;
  767. }
  768. _currentSource = default;
  769. await base.DisposeAsync().ConfigureAwait(false);
  770. }
  771. protected override async ValueTask<bool> MoveNextCore()
  772. {
  773. switch (_state)
  774. {
  775. case AsyncIteratorState.Allocated:
  776. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  777. _mode = State_Source;
  778. _state = AsyncIteratorState.Iterating;
  779. goto case AsyncIteratorState.Iterating;
  780. case AsyncIteratorState.Iterating:
  781. switch (_mode)
  782. {
  783. case State_Source:
  784. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  785. {
  786. if (_resultEnumerator != null)
  787. {
  788. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  789. }
  790. _currentSource = _sourceEnumerator.Current;
  791. var inner = _collectionSelector(_currentSource);
  792. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  793. _mode = State_Result;
  794. goto case State_Result;
  795. }
  796. break;
  797. case State_Result:
  798. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  799. {
  800. _current = _resultSelector(_currentSource, _resultEnumerator.Current);
  801. return true;
  802. }
  803. _mode = State_Source;
  804. goto case State_Source; // loop
  805. }
  806. break;
  807. }
  808. await DisposeAsync().ConfigureAwait(false);
  809. return false;
  810. }
  811. }
  812. private sealed class SelectManyAsyncIteratorWithTask<TSource, TCollection, TResult> : AsyncIterator<TResult>
  813. {
  814. private const int State_Source = 1;
  815. private const int State_Result = 2;
  816. private readonly Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  817. private readonly Func<TSource, TCollection, ValueTask<TResult>> _resultSelector;
  818. private readonly IAsyncEnumerable<TSource> _source;
  819. private TSource _currentSource;
  820. private int _mode;
  821. private IAsyncEnumerator<TCollection> _resultEnumerator;
  822. private IAsyncEnumerator<TSource> _sourceEnumerator;
  823. public SelectManyAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  824. {
  825. Debug.Assert(source != null);
  826. Debug.Assert(collectionSelector != null);
  827. Debug.Assert(resultSelector != null);
  828. _source = source;
  829. _collectionSelector = collectionSelector;
  830. _resultSelector = resultSelector;
  831. }
  832. public override AsyncIteratorBase<TResult> Clone()
  833. {
  834. return new SelectManyAsyncIteratorWithTask<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  835. }
  836. public override async ValueTask DisposeAsync()
  837. {
  838. if (_resultEnumerator != null)
  839. {
  840. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  841. _resultEnumerator = null;
  842. }
  843. if (_sourceEnumerator != null)
  844. {
  845. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  846. _sourceEnumerator = null;
  847. }
  848. _currentSource = default;
  849. await base.DisposeAsync().ConfigureAwait(false);
  850. }
  851. protected override async ValueTask<bool> MoveNextCore()
  852. {
  853. switch (_state)
  854. {
  855. case AsyncIteratorState.Allocated:
  856. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  857. _mode = State_Source;
  858. _state = AsyncIteratorState.Iterating;
  859. goto case AsyncIteratorState.Iterating;
  860. case AsyncIteratorState.Iterating:
  861. switch (_mode)
  862. {
  863. case State_Source:
  864. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  865. {
  866. if (_resultEnumerator != null)
  867. {
  868. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  869. }
  870. _currentSource = _sourceEnumerator.Current;
  871. var inner = await _collectionSelector(_currentSource).ConfigureAwait(false);
  872. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  873. _mode = State_Result;
  874. goto case State_Result;
  875. }
  876. break;
  877. case State_Result:
  878. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  879. {
  880. _current = await _resultSelector(_currentSource, _resultEnumerator.Current).ConfigureAwait(false);
  881. return true;
  882. }
  883. _mode = State_Source;
  884. goto case State_Source; // loop
  885. }
  886. break;
  887. }
  888. await DisposeAsync().ConfigureAwait(false);
  889. return false;
  890. }
  891. }
  892. #if !NO_DEEP_CANCELLATION
  893. private sealed class SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult> : AsyncIterator<TResult>
  894. {
  895. private const int State_Source = 1;
  896. private const int State_Result = 2;
  897. private readonly Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  898. private readonly Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> _resultSelector;
  899. private readonly IAsyncEnumerable<TSource> _source;
  900. private TSource _currentSource;
  901. private int _mode;
  902. private IAsyncEnumerator<TCollection> _resultEnumerator;
  903. private IAsyncEnumerator<TSource> _sourceEnumerator;
  904. public SelectManyAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  905. {
  906. Debug.Assert(source != null);
  907. Debug.Assert(collectionSelector != null);
  908. Debug.Assert(resultSelector != null);
  909. _source = source;
  910. _collectionSelector = collectionSelector;
  911. _resultSelector = resultSelector;
  912. }
  913. public override AsyncIteratorBase<TResult> Clone()
  914. {
  915. return new SelectManyAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  916. }
  917. public override async ValueTask DisposeAsync()
  918. {
  919. if (_resultEnumerator != null)
  920. {
  921. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  922. _resultEnumerator = null;
  923. }
  924. if (_sourceEnumerator != null)
  925. {
  926. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  927. _sourceEnumerator = null;
  928. }
  929. _currentSource = default;
  930. await base.DisposeAsync().ConfigureAwait(false);
  931. }
  932. protected override async ValueTask<bool> MoveNextCore()
  933. {
  934. switch (_state)
  935. {
  936. case AsyncIteratorState.Allocated:
  937. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  938. _mode = State_Source;
  939. _state = AsyncIteratorState.Iterating;
  940. goto case AsyncIteratorState.Iterating;
  941. case AsyncIteratorState.Iterating:
  942. switch (_mode)
  943. {
  944. case State_Source:
  945. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  946. {
  947. if (_resultEnumerator != null)
  948. {
  949. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  950. }
  951. _currentSource = _sourceEnumerator.Current;
  952. var inner = await _collectionSelector(_currentSource, _cancellationToken).ConfigureAwait(false);
  953. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  954. _mode = State_Result;
  955. goto case State_Result;
  956. }
  957. break;
  958. case State_Result:
  959. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  960. {
  961. _current = await _resultSelector(_currentSource, _resultEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  962. return true;
  963. }
  964. _mode = State_Source;
  965. goto case State_Source; // loop
  966. }
  967. break;
  968. }
  969. await DisposeAsync().ConfigureAwait(false);
  970. return false;
  971. }
  972. }
  973. #endif
  974. private sealed class SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult> : AsyncIterator<TResult>
  975. {
  976. private const int State_Source = 1;
  977. private const int State_Result = 2;
  978. private readonly Func<TSource, int, IAsyncEnumerable<TCollection>> _collectionSelector;
  979. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  980. private readonly IAsyncEnumerable<TSource> _source;
  981. private TSource _currentSource;
  982. private int _index;
  983. private int _mode;
  984. private IAsyncEnumerator<TCollection> _resultEnumerator;
  985. private IAsyncEnumerator<TSource> _sourceEnumerator;
  986. public SelectManyWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  987. {
  988. Debug.Assert(source != null);
  989. Debug.Assert(collectionSelector != null);
  990. Debug.Assert(resultSelector != null);
  991. _source = source;
  992. _collectionSelector = collectionSelector;
  993. _resultSelector = resultSelector;
  994. }
  995. public override AsyncIteratorBase<TResult> Clone()
  996. {
  997. return new SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  998. }
  999. public override async ValueTask DisposeAsync()
  1000. {
  1001. if (_resultEnumerator != null)
  1002. {
  1003. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1004. _resultEnumerator = null;
  1005. }
  1006. if (_sourceEnumerator != null)
  1007. {
  1008. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1009. _sourceEnumerator = null;
  1010. }
  1011. _currentSource = default;
  1012. await base.DisposeAsync().ConfigureAwait(false);
  1013. }
  1014. protected override async ValueTask<bool> MoveNextCore()
  1015. {
  1016. switch (_state)
  1017. {
  1018. case AsyncIteratorState.Allocated:
  1019. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1020. _index = -1;
  1021. _mode = State_Source;
  1022. _state = AsyncIteratorState.Iterating;
  1023. goto case AsyncIteratorState.Iterating;
  1024. case AsyncIteratorState.Iterating:
  1025. switch (_mode)
  1026. {
  1027. case State_Source:
  1028. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1029. {
  1030. if (_resultEnumerator != null)
  1031. {
  1032. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1033. }
  1034. _currentSource = _sourceEnumerator.Current;
  1035. checked
  1036. {
  1037. _index++;
  1038. }
  1039. var inner = _collectionSelector(_currentSource, _index);
  1040. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1041. _mode = State_Result;
  1042. goto case State_Result;
  1043. }
  1044. break;
  1045. case State_Result:
  1046. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1047. {
  1048. _current = _resultSelector(_currentSource, _resultEnumerator.Current);
  1049. return true;
  1050. }
  1051. _mode = State_Source;
  1052. goto case State_Source; // loop
  1053. }
  1054. break;
  1055. }
  1056. await DisposeAsync().ConfigureAwait(false);
  1057. return false;
  1058. }
  1059. }
  1060. private sealed class SelectManyWithIndexAsyncIteratorWithTask<TSource, TCollection, TResult> : AsyncIterator<TResult>
  1061. {
  1062. private const int State_Source = 1;
  1063. private const int State_Result = 2;
  1064. private readonly Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  1065. private readonly Func<TSource, TCollection, ValueTask<TResult>> _resultSelector;
  1066. private readonly IAsyncEnumerable<TSource> _source;
  1067. private TSource _currentSource;
  1068. private int _index;
  1069. private int _mode;
  1070. private IAsyncEnumerator<TCollection> _resultEnumerator;
  1071. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1072. public SelectManyWithIndexAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  1073. {
  1074. Debug.Assert(source != null);
  1075. Debug.Assert(collectionSelector != null);
  1076. Debug.Assert(resultSelector != null);
  1077. _source = source;
  1078. _collectionSelector = collectionSelector;
  1079. _resultSelector = resultSelector;
  1080. }
  1081. public override AsyncIteratorBase<TResult> Clone()
  1082. {
  1083. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  1084. }
  1085. public override async ValueTask DisposeAsync()
  1086. {
  1087. if (_resultEnumerator != null)
  1088. {
  1089. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1090. _resultEnumerator = null;
  1091. }
  1092. if (_sourceEnumerator != null)
  1093. {
  1094. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1095. _sourceEnumerator = null;
  1096. }
  1097. _currentSource = default;
  1098. await base.DisposeAsync().ConfigureAwait(false);
  1099. }
  1100. protected override async ValueTask<bool> MoveNextCore()
  1101. {
  1102. switch (_state)
  1103. {
  1104. case AsyncIteratorState.Allocated:
  1105. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1106. _index = -1;
  1107. _mode = State_Source;
  1108. _state = AsyncIteratorState.Iterating;
  1109. goto case AsyncIteratorState.Iterating;
  1110. case AsyncIteratorState.Iterating:
  1111. switch (_mode)
  1112. {
  1113. case State_Source:
  1114. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1115. {
  1116. if (_resultEnumerator != null)
  1117. {
  1118. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1119. }
  1120. _currentSource = _sourceEnumerator.Current;
  1121. checked
  1122. {
  1123. _index++;
  1124. }
  1125. var inner = await _collectionSelector(_currentSource, _index).ConfigureAwait(false);
  1126. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1127. _mode = State_Result;
  1128. goto case State_Result;
  1129. }
  1130. break;
  1131. case State_Result:
  1132. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1133. {
  1134. _current = await _resultSelector(_currentSource, _resultEnumerator.Current).ConfigureAwait(false);
  1135. return true;
  1136. }
  1137. _mode = State_Source;
  1138. goto case State_Source; // loop
  1139. }
  1140. break;
  1141. }
  1142. await DisposeAsync().ConfigureAwait(false);
  1143. return false;
  1144. }
  1145. }
  1146. #if !NO_DEEP_CANCELLATION
  1147. private sealed class SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult> : AsyncIterator<TResult>
  1148. {
  1149. private const int State_Source = 1;
  1150. private const int State_Result = 2;
  1151. private readonly Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> _collectionSelector;
  1152. private readonly Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> _resultSelector;
  1153. private readonly IAsyncEnumerable<TSource> _source;
  1154. private TSource _currentSource;
  1155. private int _index;
  1156. private int _mode;
  1157. private IAsyncEnumerator<TCollection> _resultEnumerator;
  1158. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1159. public SelectManyWithIndexAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector)
  1160. {
  1161. Debug.Assert(source != null);
  1162. Debug.Assert(collectionSelector != null);
  1163. Debug.Assert(resultSelector != null);
  1164. _source = source;
  1165. _collectionSelector = collectionSelector;
  1166. _resultSelector = resultSelector;
  1167. }
  1168. public override AsyncIteratorBase<TResult> Clone()
  1169. {
  1170. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TCollection, TResult>(_source, _collectionSelector, _resultSelector);
  1171. }
  1172. public override async ValueTask DisposeAsync()
  1173. {
  1174. if (_resultEnumerator != null)
  1175. {
  1176. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1177. _resultEnumerator = null;
  1178. }
  1179. if (_sourceEnumerator != null)
  1180. {
  1181. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1182. _sourceEnumerator = null;
  1183. }
  1184. _currentSource = default;
  1185. await base.DisposeAsync().ConfigureAwait(false);
  1186. }
  1187. protected override async ValueTask<bool> MoveNextCore()
  1188. {
  1189. switch (_state)
  1190. {
  1191. case AsyncIteratorState.Allocated:
  1192. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1193. _index = -1;
  1194. _mode = State_Source;
  1195. _state = AsyncIteratorState.Iterating;
  1196. goto case AsyncIteratorState.Iterating;
  1197. case AsyncIteratorState.Iterating:
  1198. switch (_mode)
  1199. {
  1200. case State_Source:
  1201. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1202. {
  1203. if (_resultEnumerator != null)
  1204. {
  1205. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1206. }
  1207. _currentSource = _sourceEnumerator.Current;
  1208. checked
  1209. {
  1210. _index++;
  1211. }
  1212. var inner = await _collectionSelector(_currentSource, _index, _cancellationToken).ConfigureAwait(false);
  1213. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1214. _mode = State_Result;
  1215. goto case State_Result;
  1216. }
  1217. break;
  1218. case State_Result:
  1219. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1220. {
  1221. _current = await _resultSelector(_currentSource, _resultEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  1222. return true;
  1223. }
  1224. _mode = State_Source;
  1225. goto case State_Source; // loop
  1226. }
  1227. break;
  1228. }
  1229. await DisposeAsync().ConfigureAwait(false);
  1230. return false;
  1231. }
  1232. }
  1233. #endif
  1234. private sealed class SelectManyWithIndexAsyncIterator<TSource, TResult> : AsyncIterator<TResult>
  1235. {
  1236. private const int State_Source = 1;
  1237. private const int State_Result = 2;
  1238. private readonly Func<TSource, int, IAsyncEnumerable<TResult>> _selector;
  1239. private readonly IAsyncEnumerable<TSource> _source;
  1240. private int _index;
  1241. private int _mode;
  1242. private IAsyncEnumerator<TResult> _resultEnumerator;
  1243. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1244. public SelectManyWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  1245. {
  1246. Debug.Assert(source != null);
  1247. Debug.Assert(selector != null);
  1248. _source = source;
  1249. _selector = selector;
  1250. }
  1251. public override AsyncIteratorBase<TResult> Clone()
  1252. {
  1253. return new SelectManyWithIndexAsyncIterator<TSource, TResult>(_source, _selector);
  1254. }
  1255. public override async ValueTask DisposeAsync()
  1256. {
  1257. if (_resultEnumerator != null)
  1258. {
  1259. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1260. _resultEnumerator = null;
  1261. }
  1262. if (_sourceEnumerator != null)
  1263. {
  1264. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1265. _sourceEnumerator = null;
  1266. }
  1267. await base.DisposeAsync().ConfigureAwait(false);
  1268. }
  1269. protected override async ValueTask<bool> MoveNextCore()
  1270. {
  1271. switch (_state)
  1272. {
  1273. case AsyncIteratorState.Allocated:
  1274. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1275. _index = -1;
  1276. _mode = State_Source;
  1277. _state = AsyncIteratorState.Iterating;
  1278. goto case AsyncIteratorState.Iterating;
  1279. case AsyncIteratorState.Iterating:
  1280. switch (_mode)
  1281. {
  1282. case State_Source:
  1283. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1284. {
  1285. if (_resultEnumerator != null)
  1286. {
  1287. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1288. }
  1289. checked
  1290. {
  1291. _index++;
  1292. }
  1293. var inner = _selector(_sourceEnumerator.Current, _index);
  1294. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1295. _mode = State_Result;
  1296. goto case State_Result;
  1297. }
  1298. break;
  1299. case State_Result:
  1300. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1301. {
  1302. _current = _resultEnumerator.Current;
  1303. return true;
  1304. }
  1305. _mode = State_Source;
  1306. goto case State_Source; // loop
  1307. }
  1308. break;
  1309. }
  1310. await DisposeAsync().ConfigureAwait(false);
  1311. return false;
  1312. }
  1313. }
  1314. private sealed class SelectManyWithIndexAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>
  1315. {
  1316. private const int State_Source = 1;
  1317. private const int State_Result = 2;
  1318. private readonly Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  1319. private readonly IAsyncEnumerable<TSource> _source;
  1320. private int _index;
  1321. private int _mode;
  1322. private IAsyncEnumerator<TResult> _resultEnumerator;
  1323. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1324. public SelectManyWithIndexAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IAsyncEnumerable<TResult>>> selector)
  1325. {
  1326. Debug.Assert(source != null);
  1327. Debug.Assert(selector != null);
  1328. _source = source;
  1329. _selector = selector;
  1330. }
  1331. public override AsyncIteratorBase<TResult> Clone()
  1332. {
  1333. return new SelectManyWithIndexAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  1334. }
  1335. public override async ValueTask DisposeAsync()
  1336. {
  1337. if (_resultEnumerator != null)
  1338. {
  1339. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1340. _resultEnumerator = null;
  1341. }
  1342. if (_sourceEnumerator != null)
  1343. {
  1344. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1345. _sourceEnumerator = null;
  1346. }
  1347. await base.DisposeAsync().ConfigureAwait(false);
  1348. }
  1349. protected override async ValueTask<bool> MoveNextCore()
  1350. {
  1351. switch (_state)
  1352. {
  1353. case AsyncIteratorState.Allocated:
  1354. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1355. _index = -1;
  1356. _mode = State_Source;
  1357. _state = AsyncIteratorState.Iterating;
  1358. goto case AsyncIteratorState.Iterating;
  1359. case AsyncIteratorState.Iterating:
  1360. switch (_mode)
  1361. {
  1362. case State_Source:
  1363. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1364. {
  1365. if (_resultEnumerator != null)
  1366. {
  1367. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1368. }
  1369. checked
  1370. {
  1371. _index++;
  1372. }
  1373. var inner = await _selector(_sourceEnumerator.Current, _index).ConfigureAwait(false);
  1374. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1375. _mode = State_Result;
  1376. goto case State_Result;
  1377. }
  1378. break;
  1379. case State_Result:
  1380. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1381. {
  1382. _current = _resultEnumerator.Current;
  1383. return true;
  1384. }
  1385. _mode = State_Source;
  1386. goto case State_Source; // loop
  1387. }
  1388. break;
  1389. }
  1390. await DisposeAsync().ConfigureAwait(false);
  1391. return false;
  1392. }
  1393. }
  1394. #if !NO_DEEP_CANCELLATION
  1395. private sealed class SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>
  1396. {
  1397. private const int State_Source = 1;
  1398. private const int State_Result = 2;
  1399. private readonly Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> _selector;
  1400. private readonly IAsyncEnumerable<TSource> _source;
  1401. private int _index;
  1402. private int _mode;
  1403. private IAsyncEnumerator<TResult> _resultEnumerator;
  1404. private IAsyncEnumerator<TSource> _sourceEnumerator;
  1405. public SelectManyWithIndexAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IAsyncEnumerable<TResult>>> selector)
  1406. {
  1407. Debug.Assert(source != null);
  1408. Debug.Assert(selector != null);
  1409. _source = source;
  1410. _selector = selector;
  1411. }
  1412. public override AsyncIteratorBase<TResult> Clone()
  1413. {
  1414. return new SelectManyWithIndexAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  1415. }
  1416. public override async ValueTask DisposeAsync()
  1417. {
  1418. if (_resultEnumerator != null)
  1419. {
  1420. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1421. _resultEnumerator = null;
  1422. }
  1423. if (_sourceEnumerator != null)
  1424. {
  1425. await _sourceEnumerator.DisposeAsync().ConfigureAwait(false);
  1426. _sourceEnumerator = null;
  1427. }
  1428. await base.DisposeAsync().ConfigureAwait(false);
  1429. }
  1430. protected override async ValueTask<bool> MoveNextCore()
  1431. {
  1432. switch (_state)
  1433. {
  1434. case AsyncIteratorState.Allocated:
  1435. _sourceEnumerator = _source.GetAsyncEnumerator(_cancellationToken);
  1436. _index = -1;
  1437. _mode = State_Source;
  1438. _state = AsyncIteratorState.Iterating;
  1439. goto case AsyncIteratorState.Iterating;
  1440. case AsyncIteratorState.Iterating:
  1441. switch (_mode)
  1442. {
  1443. case State_Source:
  1444. if (await _sourceEnumerator.MoveNextAsync().ConfigureAwait(false))
  1445. {
  1446. if (_resultEnumerator != null)
  1447. {
  1448. await _resultEnumerator.DisposeAsync().ConfigureAwait(false);
  1449. }
  1450. checked
  1451. {
  1452. _index++;
  1453. }
  1454. var inner = await _selector(_sourceEnumerator.Current, _index, _cancellationToken).ConfigureAwait(false);
  1455. _resultEnumerator = inner.GetAsyncEnumerator(_cancellationToken);
  1456. _mode = State_Result;
  1457. goto case State_Result;
  1458. }
  1459. break;
  1460. case State_Result:
  1461. if (await _resultEnumerator.MoveNextAsync().ConfigureAwait(false))
  1462. {
  1463. _current = _resultEnumerator.Current;
  1464. return true;
  1465. }
  1466. _mode = State_Source;
  1467. goto case State_Source; // loop
  1468. }
  1469. break;
  1470. }
  1471. await DisposeAsync().ConfigureAwait(false);
  1472. return false;
  1473. }
  1474. }
  1475. #endif
  1476. #endif
  1477. }
  1478. }