Select.cs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> selector)
  13. {
  14. if (source == null)
  15. throw Error.ArgumentNull(nameof(source));
  16. if (selector == null)
  17. throw Error.ArgumentNull(nameof(selector));
  18. switch (source)
  19. {
  20. case AsyncIterator<TSource> iterator:
  21. return iterator.Select(selector);
  22. case IList<TSource> list:
  23. return new SelectIListIterator<TSource, TResult>(list, selector);
  24. }
  25. return new SelectEnumerableAsyncIterator<TSource, TResult>(source, selector);
  26. }
  27. public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, TResult> selector)
  28. {
  29. if (source == null)
  30. throw Error.ArgumentNull(nameof(source));
  31. if (selector == null)
  32. throw Error.ArgumentNull(nameof(selector));
  33. return Create(Core);
  34. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  35. {
  36. var index = -1;
  37. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  38. {
  39. checked
  40. {
  41. index++;
  42. }
  43. yield return selector(element, index);
  44. }
  45. }
  46. }
  47. internal static IAsyncEnumerable<TResult> SelectAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TResult>> selector)
  48. {
  49. if (source == null)
  50. throw Error.ArgumentNull(nameof(source));
  51. if (selector == null)
  52. throw Error.ArgumentNull(nameof(selector));
  53. switch (source)
  54. {
  55. case AsyncIterator<TSource> iterator:
  56. return iterator.Select(selector);
  57. case IList<TSource> list:
  58. return new SelectIListIteratorWithTask<TSource, TResult>(list, selector);
  59. }
  60. return new SelectEnumerableAsyncIteratorWithTask<TSource, TResult>(source, selector);
  61. }
  62. #if !NO_DEEP_CANCELLATION
  63. internal static IAsyncEnumerable<TResult> SelectAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TResult>> selector)
  64. {
  65. if (source == null)
  66. throw Error.ArgumentNull(nameof(source));
  67. if (selector == null)
  68. throw Error.ArgumentNull(nameof(selector));
  69. switch (source)
  70. {
  71. case AsyncIterator<TSource> iterator:
  72. return iterator.Select(selector);
  73. case IList<TSource> list:
  74. return new SelectIListIteratorWithTaskAndCancellation<TSource, TResult>(list, selector);
  75. }
  76. return new SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector);
  77. }
  78. #endif
  79. internal static IAsyncEnumerable<TResult> SelectAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<TResult>> selector)
  80. {
  81. if (source == null)
  82. throw Error.ArgumentNull(nameof(source));
  83. if (selector == null)
  84. throw Error.ArgumentNull(nameof(selector));
  85. return Create(Core);
  86. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  87. {
  88. var index = -1;
  89. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  90. {
  91. checked
  92. {
  93. index++;
  94. }
  95. yield return await selector(element, index).ConfigureAwait(false);
  96. }
  97. }
  98. }
  99. #if !NO_DEEP_CANCELLATION
  100. internal static IAsyncEnumerable<TResult> SelectAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<TResult>> selector)
  101. {
  102. if (source == null)
  103. throw Error.ArgumentNull(nameof(source));
  104. if (selector == null)
  105. throw Error.ArgumentNull(nameof(selector));
  106. return Create(Core);
  107. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  108. {
  109. var index = -1;
  110. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  111. {
  112. checked
  113. {
  114. index++;
  115. }
  116. yield return await selector(element, index, cancellationToken).ConfigureAwait(false);
  117. }
  118. }
  119. }
  120. #endif
  121. internal sealed class SelectEnumerableAsyncIterator<TSource, TResult> : AsyncIterator<TResult>
  122. {
  123. private readonly Func<TSource, TResult> _selector;
  124. private readonly IAsyncEnumerable<TSource> _source;
  125. private IAsyncEnumerator<TSource> _enumerator;
  126. public SelectEnumerableAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, TResult> selector)
  127. {
  128. Debug.Assert(source != null);
  129. Debug.Assert(selector != null);
  130. _source = source;
  131. _selector = selector;
  132. }
  133. public override AsyncIteratorBase<TResult> Clone()
  134. {
  135. return new SelectEnumerableAsyncIterator<TSource, TResult>(_source, _selector);
  136. }
  137. public override async ValueTask DisposeAsync()
  138. {
  139. if (_enumerator != null)
  140. {
  141. await _enumerator.DisposeAsync().ConfigureAwait(false);
  142. _enumerator = null;
  143. }
  144. await base.DisposeAsync().ConfigureAwait(false);
  145. }
  146. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, TResult1> selector)
  147. {
  148. return new SelectEnumerableAsyncIterator<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  149. }
  150. protected override async ValueTask<bool> MoveNextCore()
  151. {
  152. switch (_state)
  153. {
  154. case AsyncIteratorState.Allocated:
  155. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  156. _state = AsyncIteratorState.Iterating;
  157. goto case AsyncIteratorState.Iterating;
  158. case AsyncIteratorState.Iterating:
  159. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  160. {
  161. _current = _selector(_enumerator.Current);
  162. return true;
  163. }
  164. break;
  165. }
  166. await DisposeAsync().ConfigureAwait(false);
  167. return false;
  168. }
  169. }
  170. internal sealed class SelectIListIterator<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  171. {
  172. private readonly Func<TSource, TResult> _selector;
  173. private readonly IList<TSource> _source;
  174. private IEnumerator<TSource> _enumerator;
  175. public SelectIListIterator(IList<TSource> source, Func<TSource, TResult> selector)
  176. {
  177. Debug.Assert(source != null);
  178. Debug.Assert(selector != null);
  179. _source = source;
  180. _selector = selector;
  181. }
  182. public override AsyncIteratorBase<TResult> Clone()
  183. {
  184. return new SelectIListIterator<TSource, TResult>(_source, _selector);
  185. }
  186. public override async ValueTask DisposeAsync()
  187. {
  188. if (_enumerator != null)
  189. {
  190. _enumerator.Dispose();
  191. _enumerator = null;
  192. }
  193. await base.DisposeAsync().ConfigureAwait(false);
  194. }
  195. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  196. {
  197. if (onlyIfCheap)
  198. {
  199. return new ValueTask<int>(-1);
  200. }
  201. cancellationToken.ThrowIfCancellationRequested();
  202. var count = 0;
  203. foreach (var item in _source)
  204. {
  205. _selector(item);
  206. checked
  207. {
  208. count++;
  209. }
  210. }
  211. return new ValueTask<int>(count);
  212. }
  213. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, TResult1> selector)
  214. {
  215. return new SelectIListIterator<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  216. }
  217. public ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  218. {
  219. cancellationToken.ThrowIfCancellationRequested();
  220. var n = _source.Count;
  221. var res = new TResult[n];
  222. for (var i = 0; i < n; i++)
  223. {
  224. res[i] = _selector(_source[i]);
  225. }
  226. return new ValueTask<TResult[]>(res);
  227. }
  228. public ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  229. {
  230. cancellationToken.ThrowIfCancellationRequested();
  231. var n = _source.Count;
  232. var res = new List<TResult>(n);
  233. for (var i = 0; i < n; i++)
  234. {
  235. res.Add(_selector(_source[i]));
  236. }
  237. return new ValueTask<List<TResult>>(res);
  238. }
  239. protected override async ValueTask<bool> MoveNextCore()
  240. {
  241. switch (_state)
  242. {
  243. case AsyncIteratorState.Allocated:
  244. _enumerator = _source.GetEnumerator();
  245. _state = AsyncIteratorState.Iterating;
  246. goto case AsyncIteratorState.Iterating;
  247. case AsyncIteratorState.Iterating:
  248. if (_enumerator.MoveNext())
  249. {
  250. _current = _selector(_enumerator.Current);
  251. return true;
  252. }
  253. await DisposeAsync().ConfigureAwait(false);
  254. break;
  255. }
  256. return false;
  257. }
  258. }
  259. internal sealed class SelectEnumerableAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>
  260. {
  261. private readonly Func<TSource, ValueTask<TResult>> _selector;
  262. private readonly IAsyncEnumerable<TSource> _source;
  263. private IAsyncEnumerator<TSource> _enumerator;
  264. public SelectEnumerableAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TResult>> selector)
  265. {
  266. Debug.Assert(source != null);
  267. Debug.Assert(selector != null);
  268. _source = source;
  269. _selector = selector;
  270. }
  271. public override AsyncIteratorBase<TResult> Clone()
  272. {
  273. return new SelectEnumerableAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  274. }
  275. public override async ValueTask DisposeAsync()
  276. {
  277. if (_enumerator != null)
  278. {
  279. await _enumerator.DisposeAsync().ConfigureAwait(false);
  280. _enumerator = null;
  281. }
  282. await base.DisposeAsync().ConfigureAwait(false);
  283. }
  284. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, ValueTask<TResult1>> selector)
  285. {
  286. return new SelectEnumerableAsyncIteratorWithTask<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  287. }
  288. protected override async ValueTask<bool> MoveNextCore()
  289. {
  290. switch (_state)
  291. {
  292. case AsyncIteratorState.Allocated:
  293. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  294. _state = AsyncIteratorState.Iterating;
  295. goto case AsyncIteratorState.Iterating;
  296. case AsyncIteratorState.Iterating:
  297. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  298. {
  299. _current = await _selector(_enumerator.Current).ConfigureAwait(false);
  300. return true;
  301. }
  302. break;
  303. }
  304. await DisposeAsync().ConfigureAwait(false);
  305. return false;
  306. }
  307. }
  308. #if !NO_DEEP_CANCELLATION
  309. internal sealed class SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>
  310. {
  311. private readonly Func<TSource, CancellationToken, ValueTask<TResult>> _selector;
  312. private readonly IAsyncEnumerable<TSource> _source;
  313. private IAsyncEnumerator<TSource> _enumerator;
  314. public SelectEnumerableAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TResult>> selector)
  315. {
  316. Debug.Assert(source != null);
  317. Debug.Assert(selector != null);
  318. _source = source;
  319. _selector = selector;
  320. }
  321. public override AsyncIteratorBase<TResult> Clone()
  322. {
  323. return new SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  324. }
  325. public override async ValueTask DisposeAsync()
  326. {
  327. if (_enumerator != null)
  328. {
  329. await _enumerator.DisposeAsync().ConfigureAwait(false);
  330. _enumerator = null;
  331. }
  332. await base.DisposeAsync().ConfigureAwait(false);
  333. }
  334. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, CancellationToken, ValueTask<TResult1>> selector)
  335. {
  336. return new SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  337. }
  338. protected override async ValueTask<bool> MoveNextCore()
  339. {
  340. switch (_state)
  341. {
  342. case AsyncIteratorState.Allocated:
  343. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  344. _state = AsyncIteratorState.Iterating;
  345. goto case AsyncIteratorState.Iterating;
  346. case AsyncIteratorState.Iterating:
  347. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  348. {
  349. _current = await _selector(_enumerator.Current, _cancellationToken).ConfigureAwait(false);
  350. return true;
  351. }
  352. break;
  353. }
  354. await DisposeAsync().ConfigureAwait(false);
  355. return false;
  356. }
  357. }
  358. #endif
  359. // NB: LINQ to Objects implements IPartition<TResult> for this. However, it seems incorrect to do so in a trivial
  360. // manner where e.g. TryGetLast simply indexes into the list without running the selector for the first n - 1
  361. // elements in order to ensure side-effects. We should consider whether we want to follow this implementation
  362. // strategy or support IAsyncPartition<TResult> in a less efficient but more correct manner here.
  363. private sealed class SelectIListIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  364. {
  365. private readonly Func<TSource, ValueTask<TResult>> _selector;
  366. private readonly IList<TSource> _source;
  367. private IEnumerator<TSource> _enumerator;
  368. public SelectIListIteratorWithTask(IList<TSource> source, Func<TSource, ValueTask<TResult>> selector)
  369. {
  370. Debug.Assert(source != null);
  371. Debug.Assert(selector != null);
  372. _source = source;
  373. _selector = selector;
  374. }
  375. public override AsyncIteratorBase<TResult> Clone()
  376. {
  377. return new SelectIListIteratorWithTask<TSource, TResult>(_source, _selector);
  378. }
  379. public override async ValueTask DisposeAsync()
  380. {
  381. if (_enumerator != null)
  382. {
  383. _enumerator.Dispose();
  384. _enumerator = null;
  385. }
  386. await base.DisposeAsync().ConfigureAwait(false);
  387. }
  388. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  389. {
  390. if (onlyIfCheap)
  391. {
  392. return new ValueTask<int>(-1);
  393. }
  394. return Core();
  395. async ValueTask<int> Core()
  396. {
  397. cancellationToken.ThrowIfCancellationRequested();
  398. var count = 0;
  399. foreach (var item in _source)
  400. {
  401. await _selector(item).ConfigureAwait(false);
  402. checked
  403. {
  404. count++;
  405. }
  406. }
  407. return count;
  408. }
  409. }
  410. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, ValueTask<TResult1>> selector)
  411. {
  412. return new SelectIListIteratorWithTask<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  413. }
  414. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  415. {
  416. cancellationToken.ThrowIfCancellationRequested();
  417. var n = _source.Count;
  418. var res = new TResult[n];
  419. for (var i = 0; i < n; i++)
  420. {
  421. res[i] = await _selector(_source[i]).ConfigureAwait(false);
  422. }
  423. return res;
  424. }
  425. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  426. {
  427. cancellationToken.ThrowIfCancellationRequested();
  428. var n = _source.Count;
  429. var res = new List<TResult>(n);
  430. for (var i = 0; i < n; i++)
  431. {
  432. res.Add(await _selector(_source[i]).ConfigureAwait(false));
  433. }
  434. return res;
  435. }
  436. protected override async ValueTask<bool> MoveNextCore()
  437. {
  438. switch (_state)
  439. {
  440. case AsyncIteratorState.Allocated:
  441. _enumerator = _source.GetEnumerator();
  442. _state = AsyncIteratorState.Iterating;
  443. goto case AsyncIteratorState.Iterating;
  444. case AsyncIteratorState.Iterating:
  445. if (_enumerator.MoveNext())
  446. {
  447. _current = await _selector(_enumerator.Current).ConfigureAwait(false);
  448. return true;
  449. }
  450. break;
  451. }
  452. await DisposeAsync().ConfigureAwait(false);
  453. return false;
  454. }
  455. }
  456. #if !NO_DEEP_CANCELLATION
  457. private sealed class SelectIListIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  458. {
  459. private readonly Func<TSource, CancellationToken, ValueTask<TResult>> _selector;
  460. private readonly IList<TSource> _source;
  461. private IEnumerator<TSource> _enumerator;
  462. public SelectIListIteratorWithTaskAndCancellation(IList<TSource> source, Func<TSource, CancellationToken, ValueTask<TResult>> selector)
  463. {
  464. Debug.Assert(source != null);
  465. Debug.Assert(selector != null);
  466. _source = source;
  467. _selector = selector;
  468. }
  469. public override AsyncIteratorBase<TResult> Clone()
  470. {
  471. return new SelectIListIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  472. }
  473. public override async ValueTask DisposeAsync()
  474. {
  475. if (_enumerator != null)
  476. {
  477. _enumerator.Dispose();
  478. _enumerator = null;
  479. }
  480. await base.DisposeAsync().ConfigureAwait(false);
  481. }
  482. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  483. {
  484. if (onlyIfCheap)
  485. {
  486. return new ValueTask<int>(-1);
  487. }
  488. return Core();
  489. async ValueTask<int> Core()
  490. {
  491. cancellationToken.ThrowIfCancellationRequested();
  492. var count = 0;
  493. foreach (var item in _source)
  494. {
  495. await _selector(item, cancellationToken).ConfigureAwait(false);
  496. checked
  497. {
  498. count++;
  499. }
  500. }
  501. return count;
  502. }
  503. }
  504. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, CancellationToken, ValueTask<TResult1>> selector)
  505. {
  506. return new SelectIListIteratorWithTaskAndCancellation<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  507. }
  508. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  509. {
  510. cancellationToken.ThrowIfCancellationRequested();
  511. var n = _source.Count;
  512. var res = new TResult[n];
  513. for (var i = 0; i < n; i++)
  514. {
  515. res[i] = await _selector(_source[i], cancellationToken).ConfigureAwait(false);
  516. }
  517. return res;
  518. }
  519. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  520. {
  521. cancellationToken.ThrowIfCancellationRequested();
  522. var n = _source.Count;
  523. var res = new List<TResult>(n);
  524. for (var i = 0; i < n; i++)
  525. {
  526. res.Add(await _selector(_source[i], cancellationToken).ConfigureAwait(false));
  527. }
  528. return res;
  529. }
  530. protected override async ValueTask<bool> MoveNextCore()
  531. {
  532. switch (_state)
  533. {
  534. case AsyncIteratorState.Allocated:
  535. _enumerator = _source.GetEnumerator();
  536. _state = AsyncIteratorState.Iterating;
  537. goto case AsyncIteratorState.Iterating;
  538. case AsyncIteratorState.Iterating:
  539. if (_enumerator.MoveNext())
  540. {
  541. _current = await _selector(_enumerator.Current, _cancellationToken).ConfigureAwait(false);
  542. return true;
  543. }
  544. break;
  545. }
  546. await DisposeAsync().ConfigureAwait(false);
  547. return false;
  548. }
  549. }
  550. #endif
  551. }
  552. }