Aggregate.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  12. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.aggregateasync?view=net-9.0-pp#system-linq-asyncenumerable-aggregateasync-1(system-collections-generic-iasyncenumerable((-0))-system-func((-0-0-0))-system-threading-cancellationtoken)
  13. /// <summary>
  14. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence.
  15. /// For aggregation behavior with incremental intermediate results, see System.Interactive.Async.AsyncEnumerableEx.Scan{TSource}.
  16. /// </summary>
  17. /// <typeparam name="TSource">The type of the elements in the source sequence and the result of the aggregation.</typeparam>
  18. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  19. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  20. /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
  21. /// <returns>An async-enumerable sequence containing a single element with the final accumulator value.</returns>
  22. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is null.</exception>
  23. /// <exception cref="InvalidOperationException">(Asynchronous) The source sequence is empty.</exception>
  24. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  25. public static ValueTask<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken = default)
  26. {
  27. if (source == null)
  28. throw Error.ArgumentNull(nameof(source));
  29. if (accumulator == null)
  30. throw Error.ArgumentNull(nameof(accumulator));
  31. return Core(source, accumulator, cancellationToken);
  32. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  33. {
  34. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  35. if (!await e.MoveNextAsync())
  36. {
  37. throw Error.NoElements();
  38. }
  39. var acc = e.Current;
  40. while (await e.MoveNextAsync())
  41. {
  42. acc = accumulator(acc, e.Current);
  43. }
  44. return acc;
  45. }
  46. }
  47. #endif
  48. /// <summary>
  49. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence.
  50. /// </summary>
  51. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  52. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  53. /// <param name="accumulator">An asynchronous accumulator function to be invoked and awaited on each element.</param>
  54. /// <param name="cancellationToken">An optional cancellation token to be used for cancelling the sequence at any time.</param>
  55. /// <returns>A ValueTask containing the final accumulator value.</returns>
  56. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is <see langword="null"/>.</exception>
  57. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  58. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  59. [GenerateAsyncOverload]
  60. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitAsync now exists as overloads of AggregateAsync.")]
  61. private static ValueTask<TSource> AggregateAwaitAsyncCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  62. {
  63. if (source == null)
  64. throw Error.ArgumentNull(nameof(source));
  65. if (accumulator == null)
  66. throw Error.ArgumentNull(nameof(accumulator));
  67. return Core(source, accumulator, cancellationToken);
  68. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  69. {
  70. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  71. if (!await e.MoveNextAsync())
  72. {
  73. throw Error.NoElements();
  74. }
  75. var acc = e.Current;
  76. while (await e.MoveNextAsync())
  77. {
  78. acc = await accumulator(acc, e.Current).ConfigureAwait(false);
  79. }
  80. return acc;
  81. }
  82. }
  83. #if !NO_DEEP_CANCELLATION
  84. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.aggregateasync?view=net-9.0-pp#system-linq-asyncenumerable-aggregateasync-3(system-collections-generic-iasyncenumerable((-0))-1-system-func((-1-0-system-threading-cancellationtoken-system-threading-tasks-valuetask((-1))))-system-func((-1-system-threading-cancellationtoken-system-threading-tasks-valuetask((-2))))-system-threading-cancellationtoken)
  85. // public static ValueTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(
  86. // this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> func, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default);
  87. // Corresponds to:
  88. // public static ValueTask<TResult> AggregateAwaitWithCancellationAsync<TSource, TAccumulate, TResult>(
  89. // this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default) { }
  90. [GenerateAsyncOverload]
  91. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitWithCancellationAsync functionality now exists as overloads of AggregateAsync.")]
  92. private static ValueTask<TSource> AggregateAwaitWithCancellationAsyncCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  93. {
  94. if (source == null)
  95. throw Error.ArgumentNull(nameof(source));
  96. if (accumulator == null)
  97. throw Error.ArgumentNull(nameof(accumulator));
  98. return Core(source, accumulator, cancellationToken);
  99. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  100. {
  101. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  102. if (!await e.MoveNextAsync())
  103. {
  104. throw Error.NoElements();
  105. }
  106. var acc = e.Current;
  107. while (await e.MoveNextAsync())
  108. {
  109. acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
  110. }
  111. return acc;
  112. }
  113. }
  114. #endif
  115. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  116. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.aggregateasync?view=net-9.0-pp#system-linq-asyncenumerable-aggregateasync-2(system-collections-generic-iasyncenumerable((-0))-1-system-func((-1-0-1))-system-threading-cancellationtoken)
  117. //public static ValueTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, CancellationToken cancellationToken = default);
  118. /// <summary>
  119. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
  120. /// For aggregation behavior with incremental intermediate results, see System.Interactive.Async.AsyncEnumerableEx.Scan{TSource, Accumulate}".
  121. /// </summary>
  122. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  123. /// <typeparam name="TAccumulate">The type of the result of the aggregation.</typeparam>
  124. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  125. /// <param name="seed">The initial accumulator value.</param>
  126. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  127. /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
  128. /// <returns>An async-enumerable sequence containing a single element with the final accumulator value.</returns>
  129. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is null.</exception>
  130. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  131. public static ValueTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken = default)
  132. {
  133. if (source == null)
  134. throw Error.ArgumentNull(nameof(source));
  135. if (accumulator == null)
  136. throw Error.ArgumentNull(nameof(accumulator));
  137. return Core(source, seed, accumulator, cancellationToken);
  138. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken)
  139. {
  140. var acc = seed;
  141. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  142. {
  143. acc = accumulator(acc, item);
  144. }
  145. return acc;
  146. }
  147. }
  148. #endif
  149. /// <summary>
  150. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
  151. /// </summary>
  152. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  153. /// <typeparam name="TAccumulate">The type of the result of aggregation.</typeparam>
  154. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  155. /// <param name="seed">The initial accumulator value.</param>
  156. /// <param name="accumulator">An asynchronous accumulator function to be invoked and awaited on each element.</param>
  157. /// <param name="cancellationToken">An optional cancellation token to be used for cancelling the sequence at any time.</param>
  158. /// <returns>A ValueTask containing the final accumulator value.</returns>
  159. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is <see langword="null"/>.</exception>
  160. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  161. [GenerateAsyncOverload]
  162. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitAsync functionality now exists as overloads of AggregateAsync.")]
  163. private static ValueTask<TAccumulate> AggregateAwaitAsyncCore<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  164. {
  165. if (source == null)
  166. throw Error.ArgumentNull(nameof(source));
  167. if (accumulator == null)
  168. throw Error.ArgumentNull(nameof(accumulator));
  169. return Core(source, seed, accumulator, cancellationToken);
  170. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  171. {
  172. var acc = seed;
  173. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  174. {
  175. acc = await accumulator(acc, item).ConfigureAwait(false);
  176. }
  177. return acc;
  178. }
  179. }
  180. #if !NO_DEEP_CANCELLATION
  181. [GenerateAsyncOverload]
  182. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitWithCancellationAsync functionality now exists as overloads of AggregateAsync.")]
  183. private static ValueTask<TAccumulate> AggregateAwaitWithCancellationAsyncCore<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  184. {
  185. if (source == null)
  186. throw Error.ArgumentNull(nameof(source));
  187. if (accumulator == null)
  188. throw Error.ArgumentNull(nameof(accumulator));
  189. return Core(source, seed, accumulator, cancellationToken);
  190. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  191. {
  192. var acc = seed;
  193. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  194. {
  195. acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
  196. }
  197. return acc;
  198. }
  199. }
  200. #endif
  201. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  202. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.aggregateasync?view=net-9.0-pp#system-linq-asyncenumerable-aggregateasync-3(system-collections-generic-iasyncenumerable((-0))-1-system-func((-1-0-1))-system-func((-1-2))-system-threading-cancellationtoken)
  203. // public static ValueTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default);
  204. /// <summary>
  205. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value,
  206. /// and the specified result selector function is used to select the result value.
  207. /// </summary>
  208. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  209. /// <typeparam name="TAccumulate">The type of the accumulator value.</typeparam>
  210. /// <typeparam name="TResult">The type of the resulting value.</typeparam>
  211. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  212. /// <param name="seed">The initial accumulator value.</param>
  213. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  214. /// <param name="resultSelector">A function to transform the final accumulator value into the result value.</param>
  215. /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
  216. /// <returns>An async-enumerable sequence containing a single element with the final accumulator value.</returns>
  217. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> or <paramref name="resultSelector"/> is null.</exception>
  218. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  219. public static ValueTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default)
  220. {
  221. if (source == null)
  222. throw Error.ArgumentNull(nameof(source));
  223. if (accumulator == null)
  224. throw Error.ArgumentNull(nameof(accumulator));
  225. if (resultSelector == null)
  226. throw Error.ArgumentNull(nameof(resultSelector));
  227. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  228. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  229. {
  230. var acc = seed;
  231. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  232. {
  233. acc = accumulator(acc, item);
  234. }
  235. return resultSelector(acc);
  236. }
  237. }
  238. #endif
  239. /// <summary>
  240. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value,
  241. /// and the specified result selector is used to select the result value.
  242. /// </summary>
  243. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  244. /// <typeparam name="TAccumulate">The type of the accumulator value.</typeparam>
  245. /// <typeparam name="TResult">The type of the resulting value.</typeparam>
  246. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  247. /// <param name="seed">The initial accumulator value.</param>
  248. /// <param name="accumulator">An asynchronous accumulator function to be invoked and awaited on each element.</param>
  249. /// <param name="resultSelector">An asynchronous transform function to transform the final accumulator value into the result value.</param>
  250. /// <param name="cancellationToken">An optional cancellation token to be used for cancelling the sequence at any time.</param>
  251. /// <returns>A ValueTask containing the value obtained by applying the result selector to the final accumulator value.</returns>
  252. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> or <paramref name="resultSelector"/> is <see langword="null"/>.</exception>
  253. [GenerateAsyncOverload]
  254. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitAsync functionality now exists as overloads of AggregateAsync.")]
  255. private static ValueTask<TResult> AggregateAwaitAsyncCore<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  256. {
  257. if (source == null)
  258. throw Error.ArgumentNull(nameof(source));
  259. if (accumulator == null)
  260. throw Error.ArgumentNull(nameof(accumulator));
  261. if (resultSelector == null)
  262. throw Error.ArgumentNull(nameof(resultSelector));
  263. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  264. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  265. {
  266. var acc = seed;
  267. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  268. {
  269. acc = await accumulator(acc, item).ConfigureAwait(false);
  270. }
  271. return await resultSelector(acc).ConfigureAwait(false);
  272. }
  273. }
  274. #if !NO_DEEP_CANCELLATION
  275. [GenerateAsyncOverload]
  276. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitWithCancellationAsync functionality now exists as overloads of AggregateAsync.")]
  277. private static ValueTask<TResult> AggregateAwaitWithCancellationAsyncCore<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  278. {
  279. if (source == null)
  280. throw Error.ArgumentNull(nameof(source));
  281. if (accumulator == null)
  282. throw Error.ArgumentNull(nameof(accumulator));
  283. if (resultSelector == null)
  284. throw Error.ArgumentNull(nameof(resultSelector));
  285. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  286. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  287. {
  288. var acc = seed;
  289. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  290. {
  291. acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
  292. }
  293. return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
  294. }
  295. }
  296. #endif
  297. }
  298. }