Select.cs 26 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. /// <summary>
  12. /// Projects each element of an async-enumerable sequence into a new form.
  13. /// </summary>
  14. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  15. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by running the selector function for each element in the source sequence.</typeparam>
  16. /// <param name="source">A sequence of elements to invoke a transform function on.</param>
  17. /// <param name="selector">A transform function to apply to each source element.</param>
  18. /// <returns>An async-enumerable sequence whose elements are the result of invoking the transform function on each element of source.</returns>
  19. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  20. public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> selector)
  21. {
  22. if (source == null)
  23. throw Error.ArgumentNull(nameof(source));
  24. if (selector == null)
  25. throw Error.ArgumentNull(nameof(selector));
  26. return source switch
  27. {
  28. AsyncIterator<TSource> iterator => iterator.Select(selector),
  29. IList<TSource> list => new SelectIListIterator<TSource, TResult>(list, selector),
  30. _ => new SelectEnumerableAsyncIterator<TSource, TResult>(source, selector),
  31. };
  32. }
  33. /// <summary>
  34. /// Projects each element of an async-enumerable sequence into a new form by incorporating the element's index.
  35. /// </summary>
  36. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  37. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by running the selector function for each element in the source sequence.</typeparam>
  38. /// <param name="source">A sequence of elements to invoke a transform function on.</param>
  39. /// <param name="selector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
  40. /// <returns>An async-enumerable sequence whose elements are the result of invoking the transform function on each element of source.</returns>
  41. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  42. public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, TResult> selector)
  43. {
  44. if (source == null)
  45. throw Error.ArgumentNull(nameof(source));
  46. if (selector == null)
  47. throw Error.ArgumentNull(nameof(selector));
  48. return Core(source, selector);
  49. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, TResult> selector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  50. {
  51. var index = -1;
  52. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  53. {
  54. checked
  55. {
  56. index++;
  57. }
  58. yield return selector(element, index);
  59. }
  60. }
  61. }
  62. internal static IAsyncEnumerable<TResult> SelectAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TResult>> selector)
  63. {
  64. if (source == null)
  65. throw Error.ArgumentNull(nameof(source));
  66. if (selector == null)
  67. throw Error.ArgumentNull(nameof(selector));
  68. return source switch
  69. {
  70. AsyncIterator<TSource> iterator => iterator.Select(selector),
  71. IList<TSource> list => new SelectIListIteratorWithTask<TSource, TResult>(list, selector),
  72. _ => new SelectEnumerableAsyncIteratorWithTask<TSource, TResult>(source, selector),
  73. };
  74. }
  75. #if !NO_DEEP_CANCELLATION
  76. internal static IAsyncEnumerable<TResult> SelectAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TResult>> selector)
  77. {
  78. if (source == null)
  79. throw Error.ArgumentNull(nameof(source));
  80. if (selector == null)
  81. throw Error.ArgumentNull(nameof(selector));
  82. return source switch
  83. {
  84. AsyncIterator<TSource> iterator => iterator.Select(selector),
  85. IList<TSource> list => new SelectIListIteratorWithTaskAndCancellation<TSource, TResult>(list, selector),
  86. _ => new SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult>(source, selector),
  87. };
  88. }
  89. #endif
  90. internal static IAsyncEnumerable<TResult> SelectAwaitCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<TResult>> selector)
  91. {
  92. if (source == null)
  93. throw Error.ArgumentNull(nameof(source));
  94. if (selector == null)
  95. throw Error.ArgumentNull(nameof(selector));
  96. return Core(source, selector);
  97. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<TResult>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  98. {
  99. var index = -1;
  100. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  101. {
  102. checked
  103. {
  104. index++;
  105. }
  106. yield return await selector(element, index).ConfigureAwait(false);
  107. }
  108. }
  109. }
  110. #if !NO_DEEP_CANCELLATION
  111. internal static IAsyncEnumerable<TResult> SelectAwaitWithCancellationCore<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<TResult>> selector)
  112. {
  113. if (source == null)
  114. throw Error.ArgumentNull(nameof(source));
  115. if (selector == null)
  116. throw Error.ArgumentNull(nameof(selector));
  117. return Core(source, selector);
  118. static async IAsyncEnumerable<TResult> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<TResult>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  119. {
  120. var index = -1;
  121. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  122. {
  123. checked
  124. {
  125. index++;
  126. }
  127. yield return await selector(element, index, cancellationToken).ConfigureAwait(false);
  128. }
  129. }
  130. }
  131. #endif
  132. internal sealed class SelectEnumerableAsyncIterator<TSource, TResult> : AsyncIterator<TResult>
  133. {
  134. private readonly Func<TSource, TResult> _selector;
  135. private readonly IAsyncEnumerable<TSource> _source;
  136. private IAsyncEnumerator<TSource>? _enumerator;
  137. public SelectEnumerableAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, TResult> selector)
  138. {
  139. _source = source;
  140. _selector = selector;
  141. }
  142. public override AsyncIteratorBase<TResult> Clone()
  143. {
  144. return new SelectEnumerableAsyncIterator<TSource, TResult>(_source, _selector);
  145. }
  146. public override async ValueTask DisposeAsync()
  147. {
  148. if (_enumerator != null)
  149. {
  150. await _enumerator.DisposeAsync().ConfigureAwait(false);
  151. _enumerator = null;
  152. }
  153. await base.DisposeAsync().ConfigureAwait(false);
  154. }
  155. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, TResult1> selector)
  156. {
  157. return new SelectEnumerableAsyncIterator<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  158. }
  159. protected override async ValueTask<bool> MoveNextCore()
  160. {
  161. switch (_state)
  162. {
  163. case AsyncIteratorState.Allocated:
  164. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  165. _state = AsyncIteratorState.Iterating;
  166. goto case AsyncIteratorState.Iterating;
  167. case AsyncIteratorState.Iterating:
  168. if (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
  169. {
  170. _current = _selector(_enumerator.Current);
  171. return true;
  172. }
  173. break;
  174. }
  175. await DisposeAsync().ConfigureAwait(false);
  176. return false;
  177. }
  178. }
  179. internal sealed class SelectIListIterator<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  180. {
  181. private readonly Func<TSource, TResult> _selector;
  182. private readonly IList<TSource> _source;
  183. private IEnumerator<TSource>? _enumerator;
  184. public SelectIListIterator(IList<TSource> source, Func<TSource, TResult> selector)
  185. {
  186. _source = source;
  187. _selector = selector;
  188. }
  189. public override AsyncIteratorBase<TResult> Clone()
  190. {
  191. return new SelectIListIterator<TSource, TResult>(_source, _selector);
  192. }
  193. public override async ValueTask DisposeAsync()
  194. {
  195. if (_enumerator != null)
  196. {
  197. _enumerator.Dispose();
  198. _enumerator = null;
  199. }
  200. await base.DisposeAsync().ConfigureAwait(false);
  201. }
  202. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  203. {
  204. if (onlyIfCheap)
  205. {
  206. return new ValueTask<int>(-1);
  207. }
  208. cancellationToken.ThrowIfCancellationRequested();
  209. var count = 0;
  210. foreach (var item in _source)
  211. {
  212. _selector(item);
  213. checked
  214. {
  215. count++;
  216. }
  217. }
  218. return new ValueTask<int>(count);
  219. }
  220. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, TResult1> selector)
  221. {
  222. return new SelectIListIterator<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  223. }
  224. public ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  225. {
  226. cancellationToken.ThrowIfCancellationRequested();
  227. var n = _source.Count;
  228. var res = new TResult[n];
  229. for (var i = 0; i < n; i++)
  230. {
  231. res[i] = _selector(_source[i]);
  232. }
  233. return new ValueTask<TResult[]>(res);
  234. }
  235. public ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  236. {
  237. cancellationToken.ThrowIfCancellationRequested();
  238. var n = _source.Count;
  239. var res = new List<TResult>(n);
  240. for (var i = 0; i < n; i++)
  241. {
  242. res.Add(_selector(_source[i]));
  243. }
  244. return new ValueTask<List<TResult>>(res);
  245. }
  246. protected override async ValueTask<bool> MoveNextCore()
  247. {
  248. switch (_state)
  249. {
  250. case AsyncIteratorState.Allocated:
  251. _enumerator = _source.GetEnumerator();
  252. _state = AsyncIteratorState.Iterating;
  253. goto case AsyncIteratorState.Iterating;
  254. case AsyncIteratorState.Iterating:
  255. if (_enumerator!.MoveNext())
  256. {
  257. _current = _selector(_enumerator.Current);
  258. return true;
  259. }
  260. await DisposeAsync().ConfigureAwait(false);
  261. break;
  262. }
  263. return false;
  264. }
  265. }
  266. internal sealed class SelectEnumerableAsyncIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>
  267. {
  268. private readonly Func<TSource, ValueTask<TResult>> _selector;
  269. private readonly IAsyncEnumerable<TSource> _source;
  270. private IAsyncEnumerator<TSource>? _enumerator;
  271. public SelectEnumerableAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TResult>> selector)
  272. {
  273. _source = source;
  274. _selector = selector;
  275. }
  276. public override AsyncIteratorBase<TResult> Clone()
  277. {
  278. return new SelectEnumerableAsyncIteratorWithTask<TSource, TResult>(_source, _selector);
  279. }
  280. public override async ValueTask DisposeAsync()
  281. {
  282. if (_enumerator != null)
  283. {
  284. await _enumerator.DisposeAsync().ConfigureAwait(false);
  285. _enumerator = null;
  286. }
  287. await base.DisposeAsync().ConfigureAwait(false);
  288. }
  289. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, ValueTask<TResult1>> selector)
  290. {
  291. return new SelectEnumerableAsyncIteratorWithTask<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  292. }
  293. protected override async ValueTask<bool> MoveNextCore()
  294. {
  295. switch (_state)
  296. {
  297. case AsyncIteratorState.Allocated:
  298. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  299. _state = AsyncIteratorState.Iterating;
  300. goto case AsyncIteratorState.Iterating;
  301. case AsyncIteratorState.Iterating:
  302. if (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
  303. {
  304. _current = await _selector(_enumerator.Current).ConfigureAwait(false);
  305. return true;
  306. }
  307. break;
  308. }
  309. await DisposeAsync().ConfigureAwait(false);
  310. return false;
  311. }
  312. }
  313. #if !NO_DEEP_CANCELLATION
  314. internal sealed class SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>
  315. {
  316. private readonly Func<TSource, CancellationToken, ValueTask<TResult>> _selector;
  317. private readonly IAsyncEnumerable<TSource> _source;
  318. private IAsyncEnumerator<TSource>? _enumerator;
  319. public SelectEnumerableAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TResult>> selector)
  320. {
  321. _source = source;
  322. _selector = selector;
  323. }
  324. public override AsyncIteratorBase<TResult> Clone()
  325. {
  326. return new SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  327. }
  328. public override async ValueTask DisposeAsync()
  329. {
  330. if (_enumerator != null)
  331. {
  332. await _enumerator.DisposeAsync().ConfigureAwait(false);
  333. _enumerator = null;
  334. }
  335. await base.DisposeAsync().ConfigureAwait(false);
  336. }
  337. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, CancellationToken, ValueTask<TResult1>> selector)
  338. {
  339. return new SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  340. }
  341. protected override async ValueTask<bool> MoveNextCore()
  342. {
  343. switch (_state)
  344. {
  345. case AsyncIteratorState.Allocated:
  346. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  347. _state = AsyncIteratorState.Iterating;
  348. goto case AsyncIteratorState.Iterating;
  349. case AsyncIteratorState.Iterating:
  350. if (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
  351. {
  352. _current = await _selector(_enumerator.Current, _cancellationToken).ConfigureAwait(false);
  353. return true;
  354. }
  355. break;
  356. }
  357. await DisposeAsync().ConfigureAwait(false);
  358. return false;
  359. }
  360. }
  361. #endif
  362. // NB: LINQ to Objects implements IPartition<TResult> for this. However, it seems incorrect to do so in a trivial
  363. // manner where e.g. TryGetLast simply indexes into the list without running the selector for the first n - 1
  364. // elements in order to ensure side-effects. We should consider whether we want to follow this implementation
  365. // strategy or support IAsyncPartition<TResult> in a less efficient but more correct manner here.
  366. private sealed class SelectIListIteratorWithTask<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  367. {
  368. private readonly Func<TSource, ValueTask<TResult>> _selector;
  369. private readonly IList<TSource> _source;
  370. private IEnumerator<TSource>? _enumerator;
  371. public SelectIListIteratorWithTask(IList<TSource> source, Func<TSource, ValueTask<TResult>> selector)
  372. {
  373. _source = source;
  374. _selector = selector;
  375. }
  376. public override AsyncIteratorBase<TResult> Clone()
  377. {
  378. return new SelectIListIteratorWithTask<TSource, TResult>(_source, _selector);
  379. }
  380. public override async ValueTask DisposeAsync()
  381. {
  382. if (_enumerator != null)
  383. {
  384. _enumerator.Dispose();
  385. _enumerator = null;
  386. }
  387. await base.DisposeAsync().ConfigureAwait(false);
  388. }
  389. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  390. {
  391. if (onlyIfCheap)
  392. {
  393. return new ValueTask<int>(-1);
  394. }
  395. return Core();
  396. async ValueTask<int> Core()
  397. {
  398. cancellationToken.ThrowIfCancellationRequested();
  399. var count = 0;
  400. foreach (var item in _source)
  401. {
  402. await _selector(item).ConfigureAwait(false);
  403. checked
  404. {
  405. count++;
  406. }
  407. }
  408. return count;
  409. }
  410. }
  411. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, ValueTask<TResult1>> selector)
  412. {
  413. return new SelectIListIteratorWithTask<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  414. }
  415. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  416. {
  417. cancellationToken.ThrowIfCancellationRequested();
  418. var n = _source.Count;
  419. var res = new TResult[n];
  420. for (var i = 0; i < n; i++)
  421. {
  422. res[i] = await _selector(_source[i]).ConfigureAwait(false);
  423. }
  424. return res;
  425. }
  426. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  427. {
  428. cancellationToken.ThrowIfCancellationRequested();
  429. var n = _source.Count;
  430. var res = new List<TResult>(n);
  431. for (var i = 0; i < n; i++)
  432. {
  433. res.Add(await _selector(_source[i]).ConfigureAwait(false));
  434. }
  435. return res;
  436. }
  437. protected override async ValueTask<bool> MoveNextCore()
  438. {
  439. switch (_state)
  440. {
  441. case AsyncIteratorState.Allocated:
  442. _enumerator = _source.GetEnumerator();
  443. _state = AsyncIteratorState.Iterating;
  444. goto case AsyncIteratorState.Iterating;
  445. case AsyncIteratorState.Iterating:
  446. if (_enumerator!.MoveNext())
  447. {
  448. _current = await _selector(_enumerator.Current).ConfigureAwait(false);
  449. return true;
  450. }
  451. break;
  452. }
  453. await DisposeAsync().ConfigureAwait(false);
  454. return false;
  455. }
  456. }
  457. #if !NO_DEEP_CANCELLATION
  458. private sealed class SelectIListIteratorWithTaskAndCancellation<TSource, TResult> : AsyncIterator<TResult>, IAsyncIListProvider<TResult>
  459. {
  460. private readonly Func<TSource, CancellationToken, ValueTask<TResult>> _selector;
  461. private readonly IList<TSource> _source;
  462. private IEnumerator<TSource>? _enumerator;
  463. public SelectIListIteratorWithTaskAndCancellation(IList<TSource> source, Func<TSource, CancellationToken, ValueTask<TResult>> selector)
  464. {
  465. _source = source;
  466. _selector = selector;
  467. }
  468. public override AsyncIteratorBase<TResult> Clone()
  469. {
  470. return new SelectIListIteratorWithTaskAndCancellation<TSource, TResult>(_source, _selector);
  471. }
  472. public override async ValueTask DisposeAsync()
  473. {
  474. if (_enumerator != null)
  475. {
  476. _enumerator.Dispose();
  477. _enumerator = null;
  478. }
  479. await base.DisposeAsync().ConfigureAwait(false);
  480. }
  481. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  482. {
  483. if (onlyIfCheap)
  484. {
  485. return new ValueTask<int>(-1);
  486. }
  487. return Core();
  488. async ValueTask<int> Core()
  489. {
  490. cancellationToken.ThrowIfCancellationRequested();
  491. var count = 0;
  492. foreach (var item in _source)
  493. {
  494. await _selector(item, cancellationToken).ConfigureAwait(false);
  495. checked
  496. {
  497. count++;
  498. }
  499. }
  500. return count;
  501. }
  502. }
  503. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, CancellationToken, ValueTask<TResult1>> selector)
  504. {
  505. return new SelectIListIteratorWithTaskAndCancellation<TSource, TResult1>(_source, CombineSelectors(_selector, selector));
  506. }
  507. public async ValueTask<TResult[]> ToArrayAsync(CancellationToken cancellationToken)
  508. {
  509. cancellationToken.ThrowIfCancellationRequested();
  510. var n = _source.Count;
  511. var res = new TResult[n];
  512. for (var i = 0; i < n; i++)
  513. {
  514. res[i] = await _selector(_source[i], cancellationToken).ConfigureAwait(false);
  515. }
  516. return res;
  517. }
  518. public async ValueTask<List<TResult>> ToListAsync(CancellationToken cancellationToken)
  519. {
  520. cancellationToken.ThrowIfCancellationRequested();
  521. var n = _source.Count;
  522. var res = new List<TResult>(n);
  523. for (var i = 0; i < n; i++)
  524. {
  525. res.Add(await _selector(_source[i], cancellationToken).ConfigureAwait(false));
  526. }
  527. return res;
  528. }
  529. protected override async ValueTask<bool> MoveNextCore()
  530. {
  531. switch (_state)
  532. {
  533. case AsyncIteratorState.Allocated:
  534. _enumerator = _source.GetEnumerator();
  535. _state = AsyncIteratorState.Iterating;
  536. goto case AsyncIteratorState.Iterating;
  537. case AsyncIteratorState.Iterating:
  538. if (_enumerator!.MoveNext())
  539. {
  540. _current = await _selector(_enumerator.Current, _cancellationToken).ConfigureAwait(false);
  541. return true;
  542. }
  543. break;
  544. }
  545. await DisposeAsync().ConfigureAwait(false);
  546. return false;
  547. }
  548. }
  549. #endif
  550. }
  551. }