Aggregate.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. /// <summary>
  12. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence.
  13. /// For aggregation behavior with incremental intermediate results, see System.Interactive.Async.AsyncEnumerableEx.Scan{TSource}.
  14. /// </summary>
  15. /// <typeparam name="TSource">The type of the elements in the source sequence and the result of the aggregation.</typeparam>
  16. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  17. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  18. /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
  19. /// <returns>An async-enumerable sequence containing a single element with the final accumulator value.</returns>
  20. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is null.</exception>
  21. /// <exception cref="InvalidOperationException">(Asynchronous) The source sequence is empty.</exception>
  22. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  23. public static ValueTask<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken = default)
  24. {
  25. if (source == null)
  26. throw Error.ArgumentNull(nameof(source));
  27. if (accumulator == null)
  28. throw Error.ArgumentNull(nameof(accumulator));
  29. return Core(source, accumulator, cancellationToken);
  30. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  31. {
  32. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  33. if (!await e.MoveNextAsync())
  34. {
  35. throw Error.NoElements();
  36. }
  37. var acc = e.Current;
  38. while (await e.MoveNextAsync())
  39. {
  40. acc = accumulator(acc, e.Current);
  41. }
  42. return acc;
  43. }
  44. }
  45. /// <summary>
  46. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence.
  47. /// </summary>
  48. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  49. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  50. /// <param name="accumulator">An asynchronous accumulator function to be invoked and awaited on each element.</param>
  51. /// <param name="cancellationToken">An optional cancellation token to be used for cancelling the sequence at any time.</param>
  52. /// <returns>A ValueTask containing the final accumulator value.</returns>
  53. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is <see langword="null"/>.</exception>
  54. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  55. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  56. [GenerateAsyncOverload]
  57. private static ValueTask<TSource> AggregateAwaitAsyncCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  58. {
  59. if (source == null)
  60. throw Error.ArgumentNull(nameof(source));
  61. if (accumulator == null)
  62. throw Error.ArgumentNull(nameof(accumulator));
  63. return Core(source, accumulator, cancellationToken);
  64. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  65. {
  66. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  67. if (!await e.MoveNextAsync())
  68. {
  69. throw Error.NoElements();
  70. }
  71. var acc = e.Current;
  72. while (await e.MoveNextAsync())
  73. {
  74. acc = await accumulator(acc, e.Current).ConfigureAwait(false);
  75. }
  76. return acc;
  77. }
  78. }
  79. #if !NO_DEEP_CANCELLATION
  80. [GenerateAsyncOverload]
  81. private static ValueTask<TSource> AggregateAwaitWithCancellationAsyncCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  82. {
  83. if (source == null)
  84. throw Error.ArgumentNull(nameof(source));
  85. if (accumulator == null)
  86. throw Error.ArgumentNull(nameof(accumulator));
  87. return Core(source, accumulator, cancellationToken);
  88. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  89. {
  90. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  91. if (!await e.MoveNextAsync())
  92. {
  93. throw Error.NoElements();
  94. }
  95. var acc = e.Current;
  96. while (await e.MoveNextAsync())
  97. {
  98. acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
  99. }
  100. return acc;
  101. }
  102. }
  103. #endif
  104. /// <summary>
  105. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
  106. /// For aggregation behavior with incremental intermediate results, see System.Interactive.Async.AsyncEnumerableEx.Scan{TSource, Accumulate}".
  107. /// </summary>
  108. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  109. /// <typeparam name="TAccumulate">The type of the result of the aggregation.</typeparam>
  110. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  111. /// <param name="seed">The initial accumulator value.</param>
  112. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  113. /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
  114. /// <returns>An async-enumerable sequence containing a single element with the final accumulator value.</returns>
  115. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is null.</exception>
  116. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  117. public static ValueTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken = default)
  118. {
  119. if (source == null)
  120. throw Error.ArgumentNull(nameof(source));
  121. if (accumulator == null)
  122. throw Error.ArgumentNull(nameof(accumulator));
  123. return Core(source, seed, accumulator, cancellationToken);
  124. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken)
  125. {
  126. var acc = seed;
  127. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  128. {
  129. acc = accumulator(acc, item);
  130. }
  131. return acc;
  132. }
  133. }
  134. /// <summary>
  135. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
  136. /// </summary>
  137. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  138. /// <typeparam name="TAccumulate">The type of the result of aggregation.</typeparam>
  139. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  140. /// <param name="seed">The initial accumulator value.</param>
  141. /// <param name="accumulator">An asynchronous accumulator function to be invoked and awaited on each element.</param>
  142. /// <param name="cancellationToken">An optional cancellation token to be used for cancelling the sequence at any time.</param>
  143. /// <returns>A ValueTask containing the final accumulator value.</returns>
  144. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is <see langword="null"/>.</exception>
  145. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  146. [GenerateAsyncOverload]
  147. private static ValueTask<TAccumulate> AggregateAwaitAsyncCore<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  148. {
  149. if (source == null)
  150. throw Error.ArgumentNull(nameof(source));
  151. if (accumulator == null)
  152. throw Error.ArgumentNull(nameof(accumulator));
  153. return Core(source, seed, accumulator, cancellationToken);
  154. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  155. {
  156. var acc = seed;
  157. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  158. {
  159. acc = await accumulator(acc, item).ConfigureAwait(false);
  160. }
  161. return acc;
  162. }
  163. }
  164. #if !NO_DEEP_CANCELLATION
  165. [GenerateAsyncOverload]
  166. private static ValueTask<TAccumulate> AggregateAwaitWithCancellationAsyncCore<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  167. {
  168. if (source == null)
  169. throw Error.ArgumentNull(nameof(source));
  170. if (accumulator == null)
  171. throw Error.ArgumentNull(nameof(accumulator));
  172. return Core(source, seed, accumulator, cancellationToken);
  173. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  174. {
  175. var acc = seed;
  176. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  177. {
  178. acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
  179. }
  180. return acc;
  181. }
  182. }
  183. #endif
  184. /// <summary>
  185. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value,
  186. /// and the specified result selector function is used to select the result value.
  187. /// </summary>
  188. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  189. /// <typeparam name="TAccumulate">The type of the accumulator value.</typeparam>
  190. /// <typeparam name="TResult">The type of the resulting value.</typeparam>
  191. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  192. /// <param name="seed">The initial accumulator value.</param>
  193. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  194. /// <param name="resultSelector">A function to transform the final accumulator value into the result value.</param>
  195. /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
  196. /// <returns>An async-enumerable sequence containing a single element with the final accumulator value.</returns>
  197. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> or <paramref name="resultSelector"/> is null.</exception>
  198. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  199. public static ValueTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default)
  200. {
  201. if (source == null)
  202. throw Error.ArgumentNull(nameof(source));
  203. if (accumulator == null)
  204. throw Error.ArgumentNull(nameof(accumulator));
  205. if (resultSelector == null)
  206. throw Error.ArgumentNull(nameof(resultSelector));
  207. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  208. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  209. {
  210. var acc = seed;
  211. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  212. {
  213. acc = accumulator(acc, item);
  214. }
  215. return resultSelector(acc);
  216. }
  217. }
  218. /// <summary>
  219. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value,
  220. /// and the specified result selector is used to select the result value.
  221. /// </summary>
  222. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  223. /// <typeparam name="TAccumulate">The type of the accumulator value.</typeparam>
  224. /// <typeparam name="TResult">The type of the resulting value.</typeparam>
  225. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  226. /// <param name="seed">The initial accumulator value.</param>
  227. /// <param name="accumulator">An asynchronous accumulator function to be invoked and awaited on each element.</param>
  228. /// <param name="resultSelector">An asynchronous transform function to transform the final accumulator value into the result value.</param>
  229. /// <param name="cancellationToken">An optional cancellation token to be used for cancelling the sequence at any time.</param>
  230. /// <returns>A ValueTask containing the value obtained by applying the result selector to the final accumulator value.</returns>
  231. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> or <paramref name="resultSelector"/> is <see langword="null"/>.</exception>
  232. [GenerateAsyncOverload]
  233. private static ValueTask<TResult> AggregateAwaitAsyncCore<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  234. {
  235. if (source == null)
  236. throw Error.ArgumentNull(nameof(source));
  237. if (accumulator == null)
  238. throw Error.ArgumentNull(nameof(accumulator));
  239. if (resultSelector == null)
  240. throw Error.ArgumentNull(nameof(resultSelector));
  241. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  242. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  243. {
  244. var acc = seed;
  245. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  246. {
  247. acc = await accumulator(acc, item).ConfigureAwait(false);
  248. }
  249. return await resultSelector(acc).ConfigureAwait(false);
  250. }
  251. }
  252. #if !NO_DEEP_CANCELLATION
  253. [GenerateAsyncOverload]
  254. private static ValueTask<TResult> AggregateAwaitWithCancellationAsyncCore<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  255. {
  256. if (source == null)
  257. throw Error.ArgumentNull(nameof(source));
  258. if (accumulator == null)
  259. throw Error.ArgumentNull(nameof(accumulator));
  260. if (resultSelector == null)
  261. throw Error.ArgumentNull(nameof(resultSelector));
  262. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  263. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  264. {
  265. var acc = seed;
  266. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  267. {
  268. acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
  269. }
  270. return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
  271. }
  272. }
  273. #endif
  274. }
  275. }