Scan.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerableEx
  11. {
  12. // NB: Implementations of Scan never yield the first element, unlike the behavior of Aggregate on a sequence with one
  13. // element, which returns the first element (or the seed if given an empty sequence). This is compatible with Rx
  14. // but one could argue whether it was the right default.
  15. public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  16. {
  17. if (source == null)
  18. throw Error.ArgumentNull(nameof(source));
  19. if (accumulator == null)
  20. throw Error.ArgumentNull(nameof(accumulator));
  21. #if USE_ASYNC_ITERATOR
  22. return AsyncEnumerable.Create(Core);
  23. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  24. {
  25. var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  26. try // REVIEW: Can use `await using` if we get pattern bind (HAS_AWAIT_USING_PATTERN_BIND)
  27. {
  28. if (!await e.MoveNextAsync())
  29. {
  30. yield break;
  31. }
  32. TSource res = e.Current;
  33. while (await e.MoveNextAsync())
  34. {
  35. res = accumulator(res, e.Current);
  36. yield return res;
  37. }
  38. }
  39. finally
  40. {
  41. await e.DisposeAsync();
  42. }
  43. }
  44. #else
  45. return new ScanAsyncEnumerable<TSource>(source, accumulator);
  46. #endif
  47. }
  48. public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  49. {
  50. if (source == null)
  51. throw Error.ArgumentNull(nameof(source));
  52. if (accumulator == null)
  53. throw Error.ArgumentNull(nameof(accumulator));
  54. #if USE_ASYNC_ITERATOR
  55. return AsyncEnumerable.Create(Core);
  56. async IAsyncEnumerator<TAccumulate> Core(CancellationToken cancellationToken)
  57. {
  58. TAccumulate res = seed;
  59. await foreach (TSource item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  60. {
  61. res = accumulator(res, item);
  62. yield return res;
  63. }
  64. }
  65. #else
  66. return new ScanAsyncEnumerable<TSource, TAccumulate>(source, seed, accumulator);
  67. #endif
  68. }
  69. public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator)
  70. {
  71. if (source == null)
  72. throw Error.ArgumentNull(nameof(source));
  73. if (accumulator == null)
  74. throw Error.ArgumentNull(nameof(accumulator));
  75. #if USE_ASYNC_ITERATOR
  76. return AsyncEnumerable.Create(Core);
  77. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  78. {
  79. var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  80. try // REVIEW: Can use `await using` if we get pattern bind (HAS_AWAIT_USING_PATTERN_BIND)
  81. {
  82. if (!await e.MoveNextAsync())
  83. {
  84. yield break;
  85. }
  86. TSource res = e.Current;
  87. while (await e.MoveNextAsync())
  88. {
  89. res = await accumulator(res, e.Current).ConfigureAwait(false);
  90. yield return res;
  91. }
  92. }
  93. finally
  94. {
  95. await e.DisposeAsync();
  96. }
  97. }
  98. #else
  99. return new ScanAsyncEnumerableWithTask<TSource>(source, accumulator);
  100. #endif
  101. }
  102. #if !NO_DEEP_CANCELLATION
  103. public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator)
  104. {
  105. if (source == null)
  106. throw Error.ArgumentNull(nameof(source));
  107. if (accumulator == null)
  108. throw Error.ArgumentNull(nameof(accumulator));
  109. #if USE_ASYNC_ITERATOR
  110. return AsyncEnumerable.Create(Core);
  111. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  112. {
  113. var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  114. try // REVIEW: Can use `await using` if we get pattern bind (HAS_AWAIT_USING_PATTERN_BIND)
  115. {
  116. if (!await e.MoveNextAsync())
  117. {
  118. yield break;
  119. }
  120. TSource res = e.Current;
  121. while (await e.MoveNextAsync())
  122. {
  123. res = await accumulator(res, e.Current, cancellationToken).ConfigureAwait(false);
  124. yield return res;
  125. }
  126. }
  127. finally
  128. {
  129. await e.DisposeAsync();
  130. }
  131. }
  132. #else
  133. return new ScanAsyncEnumerableWithTaskAndCancellation<TSource>(source, accumulator);
  134. #endif
  135. }
  136. #endif
  137. public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator)
  138. {
  139. if (source == null)
  140. throw Error.ArgumentNull(nameof(source));
  141. if (accumulator == null)
  142. throw Error.ArgumentNull(nameof(accumulator));
  143. #if USE_ASYNC_ITERATOR
  144. return AsyncEnumerable.Create(Core);
  145. async IAsyncEnumerator<TAccumulate> Core(CancellationToken cancellationToken)
  146. {
  147. TAccumulate res = seed;
  148. await foreach (TSource item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  149. {
  150. res = await accumulator(res, item).ConfigureAwait(false);
  151. yield return res;
  152. }
  153. }
  154. #else
  155. return new ScanAsyncEnumerableWithTask<TSource, TAccumulate>(source, seed, accumulator);
  156. #endif
  157. }
  158. #if !NO_DEEP_CANCELLATION
  159. public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator)
  160. {
  161. if (source == null)
  162. throw Error.ArgumentNull(nameof(source));
  163. if (accumulator == null)
  164. throw Error.ArgumentNull(nameof(accumulator));
  165. #if USE_ASYNC_ITERATOR
  166. return AsyncEnumerable.Create(Core);
  167. async IAsyncEnumerator<TAccumulate> Core(CancellationToken cancellationToken)
  168. {
  169. TAccumulate res = seed;
  170. await foreach (TSource item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  171. {
  172. res = await accumulator(res, item, cancellationToken).ConfigureAwait(false);
  173. yield return res;
  174. }
  175. }
  176. #else
  177. return new ScanAsyncEnumerableWithTaskAndCancellation<TSource, TAccumulate>(source, seed, accumulator);
  178. #endif
  179. }
  180. #endif
  181. #if !USE_ASYNC_ITERATOR
  182. private sealed class ScanAsyncEnumerable<TSource> : AsyncIterator<TSource>
  183. {
  184. private readonly Func<TSource, TSource, TSource> _accumulator;
  185. private readonly IAsyncEnumerable<TSource> _source;
  186. private TSource _accumulated;
  187. private IAsyncEnumerator<TSource> _enumerator;
  188. private bool _hasSeed;
  189. public ScanAsyncEnumerable(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  190. {
  191. Debug.Assert(source != null);
  192. Debug.Assert(accumulator != null);
  193. _source = source;
  194. _accumulator = accumulator;
  195. }
  196. public override AsyncIteratorBase<TSource> Clone()
  197. {
  198. return new ScanAsyncEnumerable<TSource>(_source, _accumulator);
  199. }
  200. public override async ValueTask DisposeAsync()
  201. {
  202. if (_enumerator != null)
  203. {
  204. await _enumerator.DisposeAsync().ConfigureAwait(false);
  205. _enumerator = null;
  206. _accumulated = default;
  207. }
  208. await base.DisposeAsync().ConfigureAwait(false);
  209. }
  210. protected override async ValueTask<bool> MoveNextCore()
  211. {
  212. switch (_state)
  213. {
  214. case AsyncIteratorState.Allocated:
  215. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  216. _hasSeed = false;
  217. _accumulated = default;
  218. _state = AsyncIteratorState.Iterating;
  219. goto case AsyncIteratorState.Iterating;
  220. case AsyncIteratorState.Iterating:
  221. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  222. {
  223. var item = _enumerator.Current;
  224. if (!_hasSeed)
  225. {
  226. _hasSeed = true;
  227. _accumulated = item;
  228. continue; // loop
  229. }
  230. _accumulated = _accumulator(_accumulated, item);
  231. _current = _accumulated;
  232. return true;
  233. }
  234. break; // case
  235. }
  236. await DisposeAsync().ConfigureAwait(false);
  237. return false;
  238. }
  239. }
  240. private sealed class ScanAsyncEnumerable<TSource, TAccumulate> : AsyncIterator<TAccumulate>
  241. {
  242. private readonly Func<TAccumulate, TSource, TAccumulate> _accumulator;
  243. private readonly TAccumulate _seed;
  244. private readonly IAsyncEnumerable<TSource> _source;
  245. private TAccumulate _accumulated;
  246. private IAsyncEnumerator<TSource> _enumerator;
  247. public ScanAsyncEnumerable(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  248. {
  249. Debug.Assert(source != null);
  250. Debug.Assert(accumulator != null);
  251. _source = source;
  252. _seed = seed;
  253. _accumulator = accumulator;
  254. }
  255. public override AsyncIteratorBase<TAccumulate> Clone()
  256. {
  257. return new ScanAsyncEnumerable<TSource, TAccumulate>(_source, _seed, _accumulator);
  258. }
  259. public override async ValueTask DisposeAsync()
  260. {
  261. if (_enumerator != null)
  262. {
  263. await _enumerator.DisposeAsync().ConfigureAwait(false);
  264. _enumerator = null;
  265. _accumulated = default;
  266. }
  267. await base.DisposeAsync().ConfigureAwait(false);
  268. }
  269. protected override async ValueTask<bool> MoveNextCore()
  270. {
  271. switch (_state)
  272. {
  273. case AsyncIteratorState.Allocated:
  274. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  275. _accumulated = _seed;
  276. _state = AsyncIteratorState.Iterating;
  277. goto case AsyncIteratorState.Iterating;
  278. case AsyncIteratorState.Iterating:
  279. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  280. {
  281. var item = _enumerator.Current;
  282. _accumulated = _accumulator(_accumulated, item);
  283. _current = _accumulated;
  284. return true;
  285. }
  286. break;
  287. }
  288. await DisposeAsync().ConfigureAwait(false);
  289. return false;
  290. }
  291. }
  292. private sealed class ScanAsyncEnumerableWithTask<TSource> : AsyncIterator<TSource>
  293. {
  294. private readonly Func<TSource, TSource, ValueTask<TSource>> _accumulator;
  295. private readonly IAsyncEnumerable<TSource> _source;
  296. private TSource _accumulated;
  297. private IAsyncEnumerator<TSource> _enumerator;
  298. private bool _hasSeed;
  299. public ScanAsyncEnumerableWithTask(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator)
  300. {
  301. Debug.Assert(source != null);
  302. Debug.Assert(accumulator != null);
  303. _source = source;
  304. _accumulator = accumulator;
  305. }
  306. public override AsyncIteratorBase<TSource> Clone()
  307. {
  308. return new ScanAsyncEnumerableWithTask<TSource>(_source, _accumulator);
  309. }
  310. public override async ValueTask DisposeAsync()
  311. {
  312. if (_enumerator != null)
  313. {
  314. await _enumerator.DisposeAsync().ConfigureAwait(false);
  315. _enumerator = null;
  316. _accumulated = default;
  317. }
  318. await base.DisposeAsync().ConfigureAwait(false);
  319. }
  320. protected override async ValueTask<bool> MoveNextCore()
  321. {
  322. switch (_state)
  323. {
  324. case AsyncIteratorState.Allocated:
  325. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  326. _hasSeed = false;
  327. _accumulated = default;
  328. _state = AsyncIteratorState.Iterating;
  329. goto case AsyncIteratorState.Iterating;
  330. case AsyncIteratorState.Iterating:
  331. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  332. {
  333. var item = _enumerator.Current;
  334. if (!_hasSeed)
  335. {
  336. _hasSeed = true;
  337. _accumulated = item;
  338. continue; // loop
  339. }
  340. _accumulated = await _accumulator(_accumulated, item).ConfigureAwait(false);
  341. _current = _accumulated;
  342. return true;
  343. }
  344. break; // case
  345. }
  346. await DisposeAsync().ConfigureAwait(false);
  347. return false;
  348. }
  349. }
  350. #if !NO_DEEP_CANCELLATION
  351. private sealed class ScanAsyncEnumerableWithTaskAndCancellation<TSource> : AsyncIterator<TSource>
  352. {
  353. private readonly Func<TSource, TSource, CancellationToken, ValueTask<TSource>> _accumulator;
  354. private readonly IAsyncEnumerable<TSource> _source;
  355. private TSource _accumulated;
  356. private IAsyncEnumerator<TSource> _enumerator;
  357. private bool _hasSeed;
  358. public ScanAsyncEnumerableWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator)
  359. {
  360. Debug.Assert(source != null);
  361. Debug.Assert(accumulator != null);
  362. _source = source;
  363. _accumulator = accumulator;
  364. }
  365. public override AsyncIteratorBase<TSource> Clone()
  366. {
  367. return new ScanAsyncEnumerableWithTaskAndCancellation<TSource>(_source, _accumulator);
  368. }
  369. public override async ValueTask DisposeAsync()
  370. {
  371. if (_enumerator != null)
  372. {
  373. await _enumerator.DisposeAsync().ConfigureAwait(false);
  374. _enumerator = null;
  375. _accumulated = default;
  376. }
  377. await base.DisposeAsync().ConfigureAwait(false);
  378. }
  379. protected override async ValueTask<bool> MoveNextCore()
  380. {
  381. switch (_state)
  382. {
  383. case AsyncIteratorState.Allocated:
  384. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  385. _hasSeed = false;
  386. _accumulated = default;
  387. _state = AsyncIteratorState.Iterating;
  388. goto case AsyncIteratorState.Iterating;
  389. case AsyncIteratorState.Iterating:
  390. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  391. {
  392. var item = _enumerator.Current;
  393. if (!_hasSeed)
  394. {
  395. _hasSeed = true;
  396. _accumulated = item;
  397. continue; // loop
  398. }
  399. _accumulated = await _accumulator(_accumulated, item, _cancellationToken).ConfigureAwait(false);
  400. _current = _accumulated;
  401. return true;
  402. }
  403. break; // case
  404. }
  405. await DisposeAsync().ConfigureAwait(false);
  406. return false;
  407. }
  408. }
  409. #endif
  410. private sealed class ScanAsyncEnumerableWithTask<TSource, TAccumulate> : AsyncIterator<TAccumulate>
  411. {
  412. private readonly Func<TAccumulate, TSource, ValueTask<TAccumulate>> _accumulator;
  413. private readonly TAccumulate _seed;
  414. private readonly IAsyncEnumerable<TSource> _source;
  415. private TAccumulate _accumulated;
  416. private IAsyncEnumerator<TSource> _enumerator;
  417. public ScanAsyncEnumerableWithTask(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator)
  418. {
  419. Debug.Assert(source != null);
  420. Debug.Assert(accumulator != null);
  421. _source = source;
  422. _seed = seed;
  423. _accumulator = accumulator;
  424. }
  425. public override AsyncIteratorBase<TAccumulate> Clone()
  426. {
  427. return new ScanAsyncEnumerableWithTask<TSource, TAccumulate>(_source, _seed, _accumulator);
  428. }
  429. public override async ValueTask DisposeAsync()
  430. {
  431. if (_enumerator != null)
  432. {
  433. await _enumerator.DisposeAsync().ConfigureAwait(false);
  434. _enumerator = null;
  435. _accumulated = default;
  436. }
  437. await base.DisposeAsync().ConfigureAwait(false);
  438. }
  439. protected override async ValueTask<bool> MoveNextCore()
  440. {
  441. switch (_state)
  442. {
  443. case AsyncIteratorState.Allocated:
  444. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  445. _accumulated = _seed;
  446. _state = AsyncIteratorState.Iterating;
  447. goto case AsyncIteratorState.Iterating;
  448. case AsyncIteratorState.Iterating:
  449. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  450. {
  451. var item = _enumerator.Current;
  452. _accumulated = await _accumulator(_accumulated, item).ConfigureAwait(false);
  453. _current = _accumulated;
  454. return true;
  455. }
  456. break;
  457. }
  458. await DisposeAsync().ConfigureAwait(false);
  459. return false;
  460. }
  461. }
  462. #if !NO_DEEP_CANCELLATION
  463. private sealed class ScanAsyncEnumerableWithTaskAndCancellation<TSource, TAccumulate> : AsyncIterator<TAccumulate>
  464. {
  465. private readonly Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> _accumulator;
  466. private readonly TAccumulate _seed;
  467. private readonly IAsyncEnumerable<TSource> _source;
  468. private TAccumulate _accumulated;
  469. private IAsyncEnumerator<TSource> _enumerator;
  470. public ScanAsyncEnumerableWithTaskAndCancellation(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator)
  471. {
  472. Debug.Assert(source != null);
  473. Debug.Assert(accumulator != null);
  474. _source = source;
  475. _seed = seed;
  476. _accumulator = accumulator;
  477. }
  478. public override AsyncIteratorBase<TAccumulate> Clone()
  479. {
  480. return new ScanAsyncEnumerableWithTaskAndCancellation<TSource, TAccumulate>(_source, _seed, _accumulator);
  481. }
  482. public override async ValueTask DisposeAsync()
  483. {
  484. if (_enumerator != null)
  485. {
  486. await _enumerator.DisposeAsync().ConfigureAwait(false);
  487. _enumerator = null;
  488. _accumulated = default;
  489. }
  490. await base.DisposeAsync().ConfigureAwait(false);
  491. }
  492. protected override async ValueTask<bool> MoveNextCore()
  493. {
  494. switch (_state)
  495. {
  496. case AsyncIteratorState.Allocated:
  497. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  498. _accumulated = _seed;
  499. _state = AsyncIteratorState.Iterating;
  500. goto case AsyncIteratorState.Iterating;
  501. case AsyncIteratorState.Iterating:
  502. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  503. {
  504. var item = _enumerator.Current;
  505. _accumulated = await _accumulator(_accumulated, item, _cancellationToken).ConfigureAwait(false);
  506. _current = _accumulated;
  507. return true;
  508. }
  509. break;
  510. }
  511. await DisposeAsync().ConfigureAwait(false);
  512. return false;
  513. }
  514. }
  515. #endif
  516. #endif
  517. }
  518. }