1
0

TakeWhile.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TSource> TakeWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)
  13. {
  14. if (source == null)
  15. throw Error.ArgumentNull(nameof(source));
  16. if (predicate == null)
  17. throw Error.ArgumentNull(nameof(predicate));
  18. #if USE_ASYNC_ITERATOR
  19. return Create(Core);
  20. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  21. {
  22. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  23. {
  24. if (!predicate(element))
  25. {
  26. break;
  27. }
  28. yield return element;
  29. }
  30. }
  31. #else
  32. return new TakeWhileAsyncIterator<TSource>(source, predicate);
  33. #endif
  34. }
  35. public static IAsyncEnumerable<TSource> TakeWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate)
  36. {
  37. if (source == null)
  38. throw Error.ArgumentNull(nameof(source));
  39. if (predicate == null)
  40. throw Error.ArgumentNull(nameof(predicate));
  41. #if USE_ASYNC_ITERATOR
  42. return Create(Core);
  43. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  44. {
  45. var index = -1;
  46. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  47. {
  48. checked
  49. {
  50. index++;
  51. }
  52. if (!predicate(element, index))
  53. {
  54. break;
  55. }
  56. yield return element;
  57. }
  58. }
  59. #else
  60. return new TakeWhileWithIndexAsyncIterator<TSource>(source, predicate);
  61. #endif
  62. }
  63. internal static IAsyncEnumerable<TSource> TakeWhileAwaitCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate)
  64. {
  65. if (source == null)
  66. throw Error.ArgumentNull(nameof(source));
  67. if (predicate == null)
  68. throw Error.ArgumentNull(nameof(predicate));
  69. #if USE_ASYNC_ITERATOR
  70. return Create(Core);
  71. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  72. {
  73. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  74. {
  75. if (!await predicate(element).ConfigureAwait(false))
  76. {
  77. break;
  78. }
  79. yield return element;
  80. }
  81. }
  82. #else
  83. return new TakeWhileAsyncIteratorWithTask<TSource>(source, predicate);
  84. #endif
  85. }
  86. #if !NO_DEEP_CANCELLATION
  87. internal static IAsyncEnumerable<TSource> TakeWhileAwaitWithCancellationCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate)
  88. {
  89. if (source == null)
  90. throw Error.ArgumentNull(nameof(source));
  91. if (predicate == null)
  92. throw Error.ArgumentNull(nameof(predicate));
  93. #if USE_ASYNC_ITERATOR
  94. return Create(Core);
  95. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  96. {
  97. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  98. {
  99. if (!await predicate(element, cancellationToken).ConfigureAwait(false))
  100. {
  101. break;
  102. }
  103. yield return element;
  104. }
  105. }
  106. #else
  107. return new TakeWhileAsyncIteratorWithTaskAndCancellation<TSource>(source, predicate);
  108. #endif
  109. }
  110. #endif
  111. internal static IAsyncEnumerable<TSource> TakeWhileAwaitCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>> predicate)
  112. {
  113. if (source == null)
  114. throw Error.ArgumentNull(nameof(source));
  115. if (predicate == null)
  116. throw Error.ArgumentNull(nameof(predicate));
  117. #if USE_ASYNC_ITERATOR
  118. return Create(Core);
  119. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  120. {
  121. var index = -1;
  122. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  123. {
  124. checked
  125. {
  126. index++;
  127. }
  128. if (!await predicate(element, index).ConfigureAwait(false))
  129. {
  130. break;
  131. }
  132. yield return element;
  133. }
  134. }
  135. #else
  136. return new TakeWhileWithIndexAsyncIteratorWithTask<TSource>(source, predicate);
  137. #endif
  138. }
  139. #if !NO_DEEP_CANCELLATION
  140. internal static IAsyncEnumerable<TSource> TakeWhileAwaitWithCancellationCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<bool>> predicate)
  141. {
  142. if (source == null)
  143. throw Error.ArgumentNull(nameof(source));
  144. if (predicate == null)
  145. throw Error.ArgumentNull(nameof(predicate));
  146. #if USE_ASYNC_ITERATOR
  147. return Create(Core);
  148. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  149. {
  150. var index = -1;
  151. await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  152. {
  153. checked
  154. {
  155. index++;
  156. }
  157. if (!await predicate(element, index, cancellationToken).ConfigureAwait(false))
  158. {
  159. break;
  160. }
  161. yield return element;
  162. }
  163. }
  164. #else
  165. return new TakeWhileWithIndexAsyncIteratorWithTaskAndCancellation<TSource>(source, predicate);
  166. #endif
  167. }
  168. #endif
  169. #if !USE_ASYNC_ITERATOR
  170. private sealed class TakeWhileAsyncIterator<TSource> : AsyncIterator<TSource>
  171. {
  172. private readonly Func<TSource, bool> _predicate;
  173. private readonly IAsyncEnumerable<TSource> _source;
  174. private IAsyncEnumerator<TSource> _enumerator;
  175. public TakeWhileAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)
  176. {
  177. Debug.Assert(predicate != null);
  178. Debug.Assert(source != null);
  179. _source = source;
  180. _predicate = predicate;
  181. }
  182. public override AsyncIteratorBase<TSource> Clone()
  183. {
  184. return new TakeWhileAsyncIterator<TSource>(_source, _predicate);
  185. }
  186. public override async ValueTask DisposeAsync()
  187. {
  188. if (_enumerator != null)
  189. {
  190. await _enumerator.DisposeAsync().ConfigureAwait(false);
  191. _enumerator = null;
  192. }
  193. await base.DisposeAsync().ConfigureAwait(false);
  194. }
  195. protected override async ValueTask<bool> MoveNextCore()
  196. {
  197. switch (_state)
  198. {
  199. case AsyncIteratorState.Allocated:
  200. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  201. _state = AsyncIteratorState.Iterating;
  202. goto case AsyncIteratorState.Iterating;
  203. case AsyncIteratorState.Iterating:
  204. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  205. {
  206. var item = _enumerator.Current;
  207. if (!_predicate(item))
  208. {
  209. break;
  210. }
  211. _current = item;
  212. return true;
  213. }
  214. break;
  215. }
  216. await DisposeAsync().ConfigureAwait(false);
  217. return false;
  218. }
  219. }
  220. private sealed class TakeWhileWithIndexAsyncIterator<TSource> : AsyncIterator<TSource>
  221. {
  222. private readonly Func<TSource, int, bool> _predicate;
  223. private readonly IAsyncEnumerable<TSource> _source;
  224. private IAsyncEnumerator<TSource> _enumerator;
  225. private int _index;
  226. public TakeWhileWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate)
  227. {
  228. Debug.Assert(predicate != null);
  229. Debug.Assert(source != null);
  230. _source = source;
  231. _predicate = predicate;
  232. }
  233. public override AsyncIteratorBase<TSource> Clone()
  234. {
  235. return new TakeWhileWithIndexAsyncIterator<TSource>(_source, _predicate);
  236. }
  237. public override async ValueTask DisposeAsync()
  238. {
  239. if (_enumerator != null)
  240. {
  241. await _enumerator.DisposeAsync().ConfigureAwait(false);
  242. _enumerator = null;
  243. }
  244. await base.DisposeAsync().ConfigureAwait(false);
  245. }
  246. protected override async ValueTask<bool> MoveNextCore()
  247. {
  248. switch (_state)
  249. {
  250. case AsyncIteratorState.Allocated:
  251. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  252. _index = -1;
  253. _state = AsyncIteratorState.Iterating;
  254. goto case AsyncIteratorState.Iterating;
  255. case AsyncIteratorState.Iterating:
  256. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  257. {
  258. var item = _enumerator.Current;
  259. checked
  260. {
  261. _index++;
  262. }
  263. if (!_predicate(item, _index))
  264. {
  265. break;
  266. }
  267. _current = item;
  268. return true;
  269. }
  270. break;
  271. }
  272. await DisposeAsync().ConfigureAwait(false);
  273. return false;
  274. }
  275. }
  276. private sealed class TakeWhileAsyncIteratorWithTask<TSource> : AsyncIterator<TSource>
  277. {
  278. private readonly Func<TSource, ValueTask<bool>> _predicate;
  279. private readonly IAsyncEnumerable<TSource> _source;
  280. private IAsyncEnumerator<TSource> _enumerator;
  281. public TakeWhileAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate)
  282. {
  283. Debug.Assert(predicate != null);
  284. Debug.Assert(source != null);
  285. _source = source;
  286. _predicate = predicate;
  287. }
  288. public override AsyncIteratorBase<TSource> Clone()
  289. {
  290. return new TakeWhileAsyncIteratorWithTask<TSource>(_source, _predicate);
  291. }
  292. public override async ValueTask DisposeAsync()
  293. {
  294. if (_enumerator != null)
  295. {
  296. await _enumerator.DisposeAsync().ConfigureAwait(false);
  297. _enumerator = null;
  298. }
  299. await base.DisposeAsync().ConfigureAwait(false);
  300. }
  301. protected override async ValueTask<bool> MoveNextCore()
  302. {
  303. switch (_state)
  304. {
  305. case AsyncIteratorState.Allocated:
  306. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  307. _state = AsyncIteratorState.Iterating;
  308. goto case AsyncIteratorState.Iterating;
  309. case AsyncIteratorState.Iterating:
  310. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  311. {
  312. var item = _enumerator.Current;
  313. if (!await _predicate(item).ConfigureAwait(false))
  314. {
  315. break;
  316. }
  317. _current = item;
  318. return true;
  319. }
  320. break;
  321. }
  322. await DisposeAsync().ConfigureAwait(false);
  323. return false;
  324. }
  325. }
  326. #if !NO_DEEP_CANCELLATION
  327. private sealed class TakeWhileAsyncIteratorWithTaskAndCancellation<TSource> : AsyncIterator<TSource>
  328. {
  329. private readonly Func<TSource, CancellationToken, ValueTask<bool>> _predicate;
  330. private readonly IAsyncEnumerable<TSource> _source;
  331. private IAsyncEnumerator<TSource> _enumerator;
  332. public TakeWhileAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate)
  333. {
  334. Debug.Assert(predicate != null);
  335. Debug.Assert(source != null);
  336. _source = source;
  337. _predicate = predicate;
  338. }
  339. public override AsyncIteratorBase<TSource> Clone()
  340. {
  341. return new TakeWhileAsyncIteratorWithTaskAndCancellation<TSource>(_source, _predicate);
  342. }
  343. public override async ValueTask DisposeAsync()
  344. {
  345. if (_enumerator != null)
  346. {
  347. await _enumerator.DisposeAsync().ConfigureAwait(false);
  348. _enumerator = null;
  349. }
  350. await base.DisposeAsync().ConfigureAwait(false);
  351. }
  352. protected override async ValueTask<bool> MoveNextCore()
  353. {
  354. switch (_state)
  355. {
  356. case AsyncIteratorState.Allocated:
  357. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  358. _state = AsyncIteratorState.Iterating;
  359. goto case AsyncIteratorState.Iterating;
  360. case AsyncIteratorState.Iterating:
  361. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  362. {
  363. var item = _enumerator.Current;
  364. if (!await _predicate(item, _cancellationToken).ConfigureAwait(false))
  365. {
  366. break;
  367. }
  368. _current = item;
  369. return true;
  370. }
  371. break;
  372. }
  373. await DisposeAsync().ConfigureAwait(false);
  374. return false;
  375. }
  376. }
  377. #endif
  378. private sealed class TakeWhileWithIndexAsyncIteratorWithTask<TSource> : AsyncIterator<TSource>
  379. {
  380. private readonly Func<TSource, int, ValueTask<bool>> _predicate;
  381. private readonly IAsyncEnumerable<TSource> _source;
  382. private IAsyncEnumerator<TSource> _enumerator;
  383. private int _index;
  384. public TakeWhileWithIndexAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>> predicate)
  385. {
  386. Debug.Assert(predicate != null);
  387. Debug.Assert(source != null);
  388. _source = source;
  389. _predicate = predicate;
  390. }
  391. public override AsyncIteratorBase<TSource> Clone()
  392. {
  393. return new TakeWhileWithIndexAsyncIteratorWithTask<TSource>(_source, _predicate);
  394. }
  395. public override async ValueTask DisposeAsync()
  396. {
  397. if (_enumerator != null)
  398. {
  399. await _enumerator.DisposeAsync().ConfigureAwait(false);
  400. _enumerator = null;
  401. }
  402. await base.DisposeAsync().ConfigureAwait(false);
  403. }
  404. protected override async ValueTask<bool> MoveNextCore()
  405. {
  406. switch (_state)
  407. {
  408. case AsyncIteratorState.Allocated:
  409. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  410. _index = -1;
  411. _state = AsyncIteratorState.Iterating;
  412. goto case AsyncIteratorState.Iterating;
  413. case AsyncIteratorState.Iterating:
  414. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  415. {
  416. var item = _enumerator.Current;
  417. checked
  418. {
  419. _index++;
  420. }
  421. if (!await _predicate(item, _index).ConfigureAwait(false))
  422. {
  423. break;
  424. }
  425. _current = item;
  426. return true;
  427. }
  428. break;
  429. }
  430. await DisposeAsync().ConfigureAwait(false);
  431. return false;
  432. }
  433. }
  434. #if !NO_DEEP_CANCELLATION
  435. private sealed class TakeWhileWithIndexAsyncIteratorWithTaskAndCancellation<TSource> : AsyncIterator<TSource>
  436. {
  437. private readonly Func<TSource, int, CancellationToken, ValueTask<bool>> _predicate;
  438. private readonly IAsyncEnumerable<TSource> _source;
  439. private IAsyncEnumerator<TSource> _enumerator;
  440. private int _index;
  441. public TakeWhileWithIndexAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<bool>> predicate)
  442. {
  443. Debug.Assert(predicate != null);
  444. Debug.Assert(source != null);
  445. _source = source;
  446. _predicate = predicate;
  447. }
  448. public override AsyncIteratorBase<TSource> Clone()
  449. {
  450. return new TakeWhileWithIndexAsyncIteratorWithTaskAndCancellation<TSource>(_source, _predicate);
  451. }
  452. public override async ValueTask DisposeAsync()
  453. {
  454. if (_enumerator != null)
  455. {
  456. await _enumerator.DisposeAsync().ConfigureAwait(false);
  457. _enumerator = null;
  458. }
  459. await base.DisposeAsync().ConfigureAwait(false);
  460. }
  461. protected override async ValueTask<bool> MoveNextCore()
  462. {
  463. switch (_state)
  464. {
  465. case AsyncIteratorState.Allocated:
  466. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  467. _index = -1;
  468. _state = AsyncIteratorState.Iterating;
  469. goto case AsyncIteratorState.Iterating;
  470. case AsyncIteratorState.Iterating:
  471. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  472. {
  473. var item = _enumerator.Current;
  474. checked
  475. {
  476. _index++;
  477. }
  478. if (!await _predicate(item, _index, _cancellationToken).ConfigureAwait(false))
  479. {
  480. break;
  481. }
  482. _current = item;
  483. return true;
  484. }
  485. break;
  486. }
  487. await DisposeAsync().ConfigureAwait(false);
  488. return false;
  489. }
  490. }
  491. #endif
  492. #endif
  493. }
  494. }