Where.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)
  13. {
  14. if (source == null)
  15. throw Error.ArgumentNull(nameof(source));
  16. if (predicate == null)
  17. throw Error.ArgumentNull(nameof(predicate));
  18. if (source is AsyncIteratorBase<TSource> iterator)
  19. {
  20. return iterator.Where(predicate);
  21. }
  22. // TODO: Can we add array/list optimizations here, does it make sense?
  23. return new WhereEnumerableAsyncIterator<TSource>(source, predicate);
  24. }
  25. public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate)
  26. {
  27. if (source == null)
  28. throw Error.ArgumentNull(nameof(source));
  29. if (predicate == null)
  30. throw Error.ArgumentNull(nameof(predicate));
  31. #if USE_ASYNC_ITERATOR
  32. return Create(Core);
  33. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  34. {
  35. var index = -1;
  36. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  37. {
  38. checked
  39. {
  40. index++;
  41. }
  42. if (predicate(element, index))
  43. {
  44. yield return element;
  45. }
  46. }
  47. }
  48. #else
  49. return new WhereEnumerableWithIndexAsyncIterator<TSource>(source, predicate);
  50. #endif
  51. }
  52. internal static IAsyncEnumerable<TSource> WhereAwaitCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate)
  53. {
  54. if (source == null)
  55. throw Error.ArgumentNull(nameof(source));
  56. if (predicate == null)
  57. throw Error.ArgumentNull(nameof(predicate));
  58. if (source is AsyncIteratorBase<TSource> iterator)
  59. {
  60. return iterator.Where(predicate);
  61. }
  62. // TODO: Can we add array/list optimizations here, does it make sense?
  63. return new WhereEnumerableAsyncIteratorWithTask<TSource>(source, predicate);
  64. }
  65. #if !NO_DEEP_CANCELLATION
  66. internal static IAsyncEnumerable<TSource> WhereAwaitWithCancellationCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate)
  67. {
  68. if (source == null)
  69. throw Error.ArgumentNull(nameof(source));
  70. if (predicate == null)
  71. throw Error.ArgumentNull(nameof(predicate));
  72. if (source is AsyncIteratorBase<TSource> iterator)
  73. {
  74. return iterator.Where(predicate);
  75. }
  76. // TODO: Can we add array/list optimizations here, does it make sense?
  77. return new WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource>(source, predicate);
  78. }
  79. #endif
  80. internal static IAsyncEnumerable<TSource> WhereAwaitCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>> predicate)
  81. {
  82. if (source == null)
  83. throw Error.ArgumentNull(nameof(source));
  84. if (predicate == null)
  85. throw Error.ArgumentNull(nameof(predicate));
  86. #if USE_ASYNC_ITERATOR
  87. return Create(Core);
  88. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  89. {
  90. var index = -1;
  91. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  92. {
  93. checked
  94. {
  95. index++;
  96. }
  97. if (await predicate(element, index).ConfigureAwait(false))
  98. {
  99. yield return element;
  100. }
  101. }
  102. }
  103. #else
  104. return new WhereEnumerableWithIndexAsyncIteratorWithTask<TSource>(source, predicate);
  105. #endif
  106. }
  107. #if !NO_DEEP_CANCELLATION
  108. internal static IAsyncEnumerable<TSource> WhereAwaitWithCancellationCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<bool>> predicate)
  109. {
  110. if (source == null)
  111. throw Error.ArgumentNull(nameof(source));
  112. if (predicate == null)
  113. throw Error.ArgumentNull(nameof(predicate));
  114. #if USE_ASYNC_ITERATOR
  115. return Create(Core);
  116. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  117. {
  118. var index = -1;
  119. await foreach (var element in AsyncEnumerableExtensions.WithCancellation(source, cancellationToken).ConfigureAwait(false))
  120. {
  121. checked
  122. {
  123. index++;
  124. }
  125. if (await predicate(element, index, cancellationToken).ConfigureAwait(false))
  126. {
  127. yield return element;
  128. }
  129. }
  130. }
  131. #else
  132. return new WhereEnumerableWithIndexAsyncIteratorWithTaskAndCancellation<TSource>(source, predicate);
  133. #endif
  134. }
  135. #endif
  136. internal sealed class WhereEnumerableAsyncIterator<TSource> : AsyncIterator<TSource>
  137. {
  138. private readonly Func<TSource, bool> _predicate;
  139. private readonly IAsyncEnumerable<TSource> _source;
  140. private IAsyncEnumerator<TSource> _enumerator;
  141. public WhereEnumerableAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)
  142. {
  143. Debug.Assert(source != null);
  144. Debug.Assert(predicate != null);
  145. _source = source;
  146. _predicate = predicate;
  147. }
  148. public override AsyncIteratorBase<TSource> Clone()
  149. {
  150. return new WhereEnumerableAsyncIterator<TSource>(_source, _predicate);
  151. }
  152. public override async ValueTask DisposeAsync()
  153. {
  154. if (_enumerator != null)
  155. {
  156. await _enumerator.DisposeAsync().ConfigureAwait(false);
  157. _enumerator = null;
  158. }
  159. await base.DisposeAsync().ConfigureAwait(false);
  160. }
  161. public override IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, TResult> selector)
  162. {
  163. return new WhereSelectEnumerableAsyncIterator<TSource, TResult>(_source, _predicate, selector);
  164. }
  165. public override IAsyncEnumerable<TSource> Where(Func<TSource, bool> predicate)
  166. {
  167. return new WhereEnumerableAsyncIterator<TSource>(_source, CombinePredicates(_predicate, predicate));
  168. }
  169. protected override async ValueTask<bool> MoveNextCore()
  170. {
  171. switch (_state)
  172. {
  173. case AsyncIteratorState.Allocated:
  174. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  175. _state = AsyncIteratorState.Iterating;
  176. goto case AsyncIteratorState.Iterating;
  177. case AsyncIteratorState.Iterating:
  178. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  179. {
  180. var item = _enumerator.Current;
  181. if (_predicate(item))
  182. {
  183. _current = item;
  184. return true;
  185. }
  186. }
  187. await DisposeAsync().ConfigureAwait(false);
  188. break;
  189. }
  190. return false;
  191. }
  192. }
  193. #if !USE_ASYNC_ITERATOR
  194. private sealed class WhereEnumerableWithIndexAsyncIterator<TSource> : AsyncIterator<TSource>
  195. {
  196. private readonly Func<TSource, int, bool> _predicate;
  197. private readonly IAsyncEnumerable<TSource> _source;
  198. private IAsyncEnumerator<TSource> _enumerator;
  199. private int _index;
  200. public WhereEnumerableWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate)
  201. {
  202. Debug.Assert(source != null);
  203. Debug.Assert(predicate != null);
  204. _source = source;
  205. _predicate = predicate;
  206. }
  207. public override AsyncIteratorBase<TSource> Clone()
  208. {
  209. return new WhereEnumerableWithIndexAsyncIterator<TSource>(_source, _predicate);
  210. }
  211. public override async ValueTask DisposeAsync()
  212. {
  213. if (_enumerator != null)
  214. {
  215. await _enumerator.DisposeAsync().ConfigureAwait(false);
  216. _enumerator = null;
  217. }
  218. await base.DisposeAsync().ConfigureAwait(false);
  219. }
  220. protected override async ValueTask<bool> MoveNextCore()
  221. {
  222. switch (_state)
  223. {
  224. case AsyncIteratorState.Allocated:
  225. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  226. _index = -1;
  227. _state = AsyncIteratorState.Iterating;
  228. goto case AsyncIteratorState.Iterating;
  229. case AsyncIteratorState.Iterating:
  230. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  231. {
  232. var item = _enumerator.Current;
  233. checked
  234. {
  235. _index++;
  236. }
  237. if (_predicate(item, _index))
  238. {
  239. _current = item;
  240. return true;
  241. }
  242. }
  243. await DisposeAsync().ConfigureAwait(false);
  244. break;
  245. }
  246. return false;
  247. }
  248. }
  249. #endif
  250. internal sealed class WhereEnumerableAsyncIteratorWithTask<TSource> : AsyncIterator<TSource>
  251. {
  252. private readonly Func<TSource, ValueTask<bool>> _predicate;
  253. private readonly IAsyncEnumerable<TSource> _source;
  254. private IAsyncEnumerator<TSource> _enumerator;
  255. public WhereEnumerableAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate)
  256. {
  257. Debug.Assert(source != null);
  258. Debug.Assert(predicate != null);
  259. _source = source;
  260. _predicate = predicate;
  261. }
  262. public override AsyncIteratorBase<TSource> Clone()
  263. {
  264. return new WhereEnumerableAsyncIteratorWithTask<TSource>(_source, _predicate);
  265. }
  266. public override async ValueTask DisposeAsync()
  267. {
  268. if (_enumerator != null)
  269. {
  270. await _enumerator.DisposeAsync().ConfigureAwait(false);
  271. _enumerator = null;
  272. }
  273. await base.DisposeAsync().ConfigureAwait(false);
  274. }
  275. public override IAsyncEnumerable<TSource> Where(Func<TSource, ValueTask<bool>> predicate)
  276. {
  277. return new WhereEnumerableAsyncIteratorWithTask<TSource>(_source, CombinePredicates(_predicate, predicate));
  278. }
  279. protected override async ValueTask<bool> MoveNextCore()
  280. {
  281. switch (_state)
  282. {
  283. case AsyncIteratorState.Allocated:
  284. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  285. _state = AsyncIteratorState.Iterating;
  286. goto case AsyncIteratorState.Iterating;
  287. case AsyncIteratorState.Iterating:
  288. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  289. {
  290. var item = _enumerator.Current;
  291. if (await _predicate(item).ConfigureAwait(false))
  292. {
  293. _current = item;
  294. return true;
  295. }
  296. }
  297. await DisposeAsync().ConfigureAwait(false);
  298. break;
  299. }
  300. return false;
  301. }
  302. }
  303. #if !NO_DEEP_CANCELLATION
  304. internal sealed class WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource> : AsyncIterator<TSource>
  305. {
  306. private readonly Func<TSource, CancellationToken, ValueTask<bool>> _predicate;
  307. private readonly IAsyncEnumerable<TSource> _source;
  308. private IAsyncEnumerator<TSource> _enumerator;
  309. public WhereEnumerableAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate)
  310. {
  311. Debug.Assert(source != null);
  312. Debug.Assert(predicate != null);
  313. _source = source;
  314. _predicate = predicate;
  315. }
  316. public override AsyncIteratorBase<TSource> Clone()
  317. {
  318. return new WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource>(_source, _predicate);
  319. }
  320. public override async ValueTask DisposeAsync()
  321. {
  322. if (_enumerator != null)
  323. {
  324. await _enumerator.DisposeAsync().ConfigureAwait(false);
  325. _enumerator = null;
  326. }
  327. await base.DisposeAsync().ConfigureAwait(false);
  328. }
  329. public override IAsyncEnumerable<TSource> Where(Func<TSource, CancellationToken, ValueTask<bool>> predicate)
  330. {
  331. return new WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource>(_source, CombinePredicates(_predicate, predicate));
  332. }
  333. protected override async ValueTask<bool> MoveNextCore()
  334. {
  335. switch (_state)
  336. {
  337. case AsyncIteratorState.Allocated:
  338. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  339. _state = AsyncIteratorState.Iterating;
  340. goto case AsyncIteratorState.Iterating;
  341. case AsyncIteratorState.Iterating:
  342. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  343. {
  344. var item = _enumerator.Current;
  345. if (await _predicate(item, _cancellationToken).ConfigureAwait(false))
  346. {
  347. _current = item;
  348. return true;
  349. }
  350. }
  351. await DisposeAsync().ConfigureAwait(false);
  352. break;
  353. }
  354. return false;
  355. }
  356. }
  357. #endif
  358. #if !USE_ASYNC_ITERATOR
  359. private sealed class WhereEnumerableWithIndexAsyncIteratorWithTask<TSource> : AsyncIterator<TSource>
  360. {
  361. private readonly Func<TSource, int, ValueTask<bool>> _predicate;
  362. private readonly IAsyncEnumerable<TSource> _source;
  363. private IAsyncEnumerator<TSource> _enumerator;
  364. private int _index;
  365. public WhereEnumerableWithIndexAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>> predicate)
  366. {
  367. Debug.Assert(source != null);
  368. Debug.Assert(predicate != null);
  369. _source = source;
  370. _predicate = predicate;
  371. }
  372. public override AsyncIteratorBase<TSource> Clone()
  373. {
  374. return new WhereEnumerableWithIndexAsyncIteratorWithTask<TSource>(_source, _predicate);
  375. }
  376. public override async ValueTask DisposeAsync()
  377. {
  378. if (_enumerator != null)
  379. {
  380. await _enumerator.DisposeAsync().ConfigureAwait(false);
  381. _enumerator = null;
  382. }
  383. await base.DisposeAsync().ConfigureAwait(false);
  384. }
  385. protected override async ValueTask<bool> MoveNextCore()
  386. {
  387. switch (_state)
  388. {
  389. case AsyncIteratorState.Allocated:
  390. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  391. _index = -1;
  392. _state = AsyncIteratorState.Iterating;
  393. goto case AsyncIteratorState.Iterating;
  394. case AsyncIteratorState.Iterating:
  395. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  396. {
  397. var item = _enumerator.Current;
  398. checked
  399. {
  400. _index++;
  401. }
  402. if (await _predicate(item, _index).ConfigureAwait(false))
  403. {
  404. _current = item;
  405. return true;
  406. }
  407. }
  408. await DisposeAsync().ConfigureAwait(false);
  409. break;
  410. }
  411. return false;
  412. }
  413. }
  414. #if !NO_DEEP_CANCELLATION
  415. private sealed class WhereEnumerableWithIndexAsyncIteratorWithTaskAndCancellation<TSource> : AsyncIterator<TSource>
  416. {
  417. private readonly Func<TSource, int, CancellationToken, ValueTask<bool>> _predicate;
  418. private readonly IAsyncEnumerable<TSource> _source;
  419. private IAsyncEnumerator<TSource> _enumerator;
  420. private int _index;
  421. public WhereEnumerableWithIndexAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<bool>> predicate)
  422. {
  423. Debug.Assert(source != null);
  424. Debug.Assert(predicate != null);
  425. _source = source;
  426. _predicate = predicate;
  427. }
  428. public override AsyncIteratorBase<TSource> Clone()
  429. {
  430. return new WhereEnumerableWithIndexAsyncIteratorWithTaskAndCancellation<TSource>(_source, _predicate);
  431. }
  432. public override async ValueTask DisposeAsync()
  433. {
  434. if (_enumerator != null)
  435. {
  436. await _enumerator.DisposeAsync().ConfigureAwait(false);
  437. _enumerator = null;
  438. }
  439. await base.DisposeAsync().ConfigureAwait(false);
  440. }
  441. protected override async ValueTask<bool> MoveNextCore()
  442. {
  443. switch (_state)
  444. {
  445. case AsyncIteratorState.Allocated:
  446. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  447. _index = -1;
  448. _state = AsyncIteratorState.Iterating;
  449. goto case AsyncIteratorState.Iterating;
  450. case AsyncIteratorState.Iterating:
  451. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  452. {
  453. var item = _enumerator.Current;
  454. checked
  455. {
  456. _index++;
  457. }
  458. if (await _predicate(item, _index, _cancellationToken).ConfigureAwait(false))
  459. {
  460. _current = item;
  461. return true;
  462. }
  463. }
  464. await DisposeAsync().ConfigureAwait(false);
  465. break;
  466. }
  467. return false;
  468. }
  469. }
  470. #endif
  471. #endif
  472. private sealed class WhereSelectEnumerableAsyncIterator<TSource, TResult> : AsyncIterator<TResult>
  473. {
  474. private readonly Func<TSource, bool> _predicate;
  475. private readonly Func<TSource, TResult> _selector;
  476. private readonly IAsyncEnumerable<TSource> _source;
  477. private IAsyncEnumerator<TSource> _enumerator;
  478. public WhereSelectEnumerableAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, Func<TSource, TResult> selector)
  479. {
  480. Debug.Assert(source != null);
  481. Debug.Assert(predicate != null);
  482. Debug.Assert(selector != null);
  483. _source = source;
  484. _predicate = predicate;
  485. _selector = selector;
  486. }
  487. public override AsyncIteratorBase<TResult> Clone()
  488. {
  489. return new WhereSelectEnumerableAsyncIterator<TSource, TResult>(_source, _predicate, _selector);
  490. }
  491. public override async ValueTask DisposeAsync()
  492. {
  493. if (_enumerator != null)
  494. {
  495. await _enumerator.DisposeAsync().ConfigureAwait(false);
  496. _enumerator = null;
  497. }
  498. await base.DisposeAsync().ConfigureAwait(false);
  499. }
  500. public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, TResult1> selector)
  501. {
  502. return new WhereSelectEnumerableAsyncIterator<TSource, TResult1>(_source, _predicate, CombineSelectors(_selector, selector));
  503. }
  504. protected override async ValueTask<bool> MoveNextCore()
  505. {
  506. switch (_state)
  507. {
  508. case AsyncIteratorState.Allocated:
  509. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  510. _state = AsyncIteratorState.Iterating;
  511. goto case AsyncIteratorState.Iterating;
  512. case AsyncIteratorState.Iterating:
  513. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  514. {
  515. var item = _enumerator.Current;
  516. if (_predicate(item))
  517. {
  518. _current = _selector(item);
  519. return true;
  520. }
  521. }
  522. await DisposeAsync().ConfigureAwait(false);
  523. break;
  524. }
  525. return false;
  526. }
  527. }
  528. }
  529. }