Aggregate.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. #if REFERENCE_ASSEMBLY
  10. public static partial class AsyncEnumerableDeprecated
  11. #else
  12. public static partial class AsyncEnumerable
  13. #endif
  14. {
  15. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  16. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.aggregateasync?view=net-9.0-pp#system-linq-asyncenumerable-aggregateasync-1(system-collections-generic-iasyncenumerable((-0))-system-func((-0-0-0))-system-threading-cancellationtoken)
  17. /// <summary>
  18. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence.
  19. /// For aggregation behavior with incremental intermediate results, see System.Interactive.Async.AsyncEnumerableEx.Scan{TSource}.
  20. /// </summary>
  21. /// <typeparam name="TSource">The type of the elements in the source sequence and the result of the aggregation.</typeparam>
  22. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  23. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  24. /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
  25. /// <returns>An async-enumerable sequence containing a single element with the final accumulator value.</returns>
  26. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is null.</exception>
  27. /// <exception cref="InvalidOperationException">(Asynchronous) The source sequence is empty.</exception>
  28. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  29. public static ValueTask<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken = default)
  30. {
  31. if (source == null)
  32. throw Error.ArgumentNull(nameof(source));
  33. if (accumulator == null)
  34. throw Error.ArgumentNull(nameof(accumulator));
  35. return Core(source, accumulator, cancellationToken);
  36. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  37. {
  38. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  39. if (!await e.MoveNextAsync())
  40. {
  41. throw Error.NoElements();
  42. }
  43. var acc = e.Current;
  44. while (await e.MoveNextAsync())
  45. {
  46. acc = accumulator(acc, e.Current);
  47. }
  48. return acc;
  49. }
  50. }
  51. #endif
  52. /// <summary>
  53. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence.
  54. /// </summary>
  55. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  56. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  57. /// <param name="accumulator">An asynchronous accumulator function to be invoked and awaited on each element.</param>
  58. /// <param name="cancellationToken">An optional cancellation token to be used for cancelling the sequence at any time.</param>
  59. /// <returns>A ValueTask containing the final accumulator value.</returns>
  60. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is <see langword="null"/>.</exception>
  61. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  62. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  63. [GenerateAsyncOverload]
  64. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitAsync now exists as overloads of AggregateAsync.")]
  65. private static ValueTask<TSource> AggregateAwaitAsyncCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  66. {
  67. if (source == null)
  68. throw Error.ArgumentNull(nameof(source));
  69. if (accumulator == null)
  70. throw Error.ArgumentNull(nameof(accumulator));
  71. return Core(source, accumulator, cancellationToken);
  72. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  73. {
  74. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  75. if (!await e.MoveNextAsync())
  76. {
  77. throw Error.NoElements();
  78. }
  79. var acc = e.Current;
  80. while (await e.MoveNextAsync())
  81. {
  82. acc = await accumulator(acc, e.Current).ConfigureAwait(false);
  83. }
  84. return acc;
  85. }
  86. }
  87. #if !NO_DEEP_CANCELLATION
  88. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.aggregateasync?view=net-9.0-pp#system-linq-asyncenumerable-aggregateasync-3(system-collections-generic-iasyncenumerable((-0))-1-system-func((-1-0-system-threading-cancellationtoken-system-threading-tasks-valuetask((-1))))-system-func((-1-system-threading-cancellationtoken-system-threading-tasks-valuetask((-2))))-system-threading-cancellationtoken)
  89. // public static ValueTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(
  90. // this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> func, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default);
  91. // Corresponds to:
  92. // public static ValueTask<TResult> AggregateAwaitWithCancellationAsync<TSource, TAccumulate, TResult>(
  93. // this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default) { }
  94. [GenerateAsyncOverload]
  95. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitWithCancellationAsync functionality now exists as overloads of AggregateAsync.")]
  96. private static ValueTask<TSource> AggregateAwaitWithCancellationAsyncCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  97. {
  98. if (source == null)
  99. throw Error.ArgumentNull(nameof(source));
  100. if (accumulator == null)
  101. throw Error.ArgumentNull(nameof(accumulator));
  102. return Core(source, accumulator, cancellationToken);
  103. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  104. {
  105. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  106. if (!await e.MoveNextAsync())
  107. {
  108. throw Error.NoElements();
  109. }
  110. var acc = e.Current;
  111. while (await e.MoveNextAsync())
  112. {
  113. acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
  114. }
  115. return acc;
  116. }
  117. }
  118. #endif
  119. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  120. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.aggregateasync?view=net-9.0-pp#system-linq-asyncenumerable-aggregateasync-2(system-collections-generic-iasyncenumerable((-0))-1-system-func((-1-0-1))-system-threading-cancellationtoken)
  121. //public static ValueTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, CancellationToken cancellationToken = default);
  122. /// <summary>
  123. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
  124. /// For aggregation behavior with incremental intermediate results, see System.Interactive.Async.AsyncEnumerableEx.Scan{TSource, Accumulate}".
  125. /// </summary>
  126. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  127. /// <typeparam name="TAccumulate">The type of the result of the aggregation.</typeparam>
  128. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  129. /// <param name="seed">The initial accumulator value.</param>
  130. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  131. /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
  132. /// <returns>An async-enumerable sequence containing a single element with the final accumulator value.</returns>
  133. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is null.</exception>
  134. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  135. public static ValueTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken = default)
  136. {
  137. if (source == null)
  138. throw Error.ArgumentNull(nameof(source));
  139. if (accumulator == null)
  140. throw Error.ArgumentNull(nameof(accumulator));
  141. return Core(source, seed, accumulator, cancellationToken);
  142. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken)
  143. {
  144. var acc = seed;
  145. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  146. {
  147. acc = accumulator(acc, item);
  148. }
  149. return acc;
  150. }
  151. }
  152. #endif
  153. /// <summary>
  154. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
  155. /// </summary>
  156. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  157. /// <typeparam name="TAccumulate">The type of the result of aggregation.</typeparam>
  158. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  159. /// <param name="seed">The initial accumulator value.</param>
  160. /// <param name="accumulator">An asynchronous accumulator function to be invoked and awaited on each element.</param>
  161. /// <param name="cancellationToken">An optional cancellation token to be used for cancelling the sequence at any time.</param>
  162. /// <returns>A ValueTask containing the final accumulator value.</returns>
  163. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is <see langword="null"/>.</exception>
  164. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  165. [GenerateAsyncOverload]
  166. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitAsync functionality now exists as overloads of AggregateAsync.")]
  167. private static ValueTask<TAccumulate> AggregateAwaitAsyncCore<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  168. {
  169. if (source == null)
  170. throw Error.ArgumentNull(nameof(source));
  171. if (accumulator == null)
  172. throw Error.ArgumentNull(nameof(accumulator));
  173. return Core(source, seed, accumulator, cancellationToken);
  174. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  175. {
  176. var acc = seed;
  177. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  178. {
  179. acc = await accumulator(acc, item).ConfigureAwait(false);
  180. }
  181. return acc;
  182. }
  183. }
  184. #if !NO_DEEP_CANCELLATION
  185. [GenerateAsyncOverload]
  186. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitWithCancellationAsync functionality now exists as overloads of AggregateAsync.")]
  187. private static ValueTask<TAccumulate> AggregateAwaitWithCancellationAsyncCore<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  188. {
  189. if (source == null)
  190. throw Error.ArgumentNull(nameof(source));
  191. if (accumulator == null)
  192. throw Error.ArgumentNull(nameof(accumulator));
  193. return Core(source, seed, accumulator, cancellationToken);
  194. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  195. {
  196. var acc = seed;
  197. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  198. {
  199. acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
  200. }
  201. return acc;
  202. }
  203. }
  204. #endif
  205. #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
  206. // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.aggregateasync?view=net-9.0-pp#system-linq-asyncenumerable-aggregateasync-3(system-collections-generic-iasyncenumerable((-0))-1-system-func((-1-0-1))-system-func((-1-2))-system-threading-cancellationtoken)
  207. // public static ValueTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default);
  208. /// <summary>
  209. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value,
  210. /// and the specified result selector function is used to select the result value.
  211. /// </summary>
  212. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  213. /// <typeparam name="TAccumulate">The type of the accumulator value.</typeparam>
  214. /// <typeparam name="TResult">The type of the resulting value.</typeparam>
  215. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  216. /// <param name="seed">The initial accumulator value.</param>
  217. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  218. /// <param name="resultSelector">A function to transform the final accumulator value into the result value.</param>
  219. /// <param name="cancellationToken">The optional cancellation token to be used for cancelling the sequence at any time.</param>
  220. /// <returns>An async-enumerable sequence containing a single element with the final accumulator value.</returns>
  221. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> or <paramref name="resultSelector"/> is null.</exception>
  222. /// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
  223. public static ValueTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default)
  224. {
  225. if (source == null)
  226. throw Error.ArgumentNull(nameof(source));
  227. if (accumulator == null)
  228. throw Error.ArgumentNull(nameof(accumulator));
  229. if (resultSelector == null)
  230. throw Error.ArgumentNull(nameof(resultSelector));
  231. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  232. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  233. {
  234. var acc = seed;
  235. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  236. {
  237. acc = accumulator(acc, item);
  238. }
  239. return resultSelector(acc);
  240. }
  241. }
  242. #endif
  243. /// <summary>
  244. /// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value,
  245. /// and the specified result selector is used to select the result value.
  246. /// </summary>
  247. /// <typeparam name="TSource">The type of elements in the source sequence.</typeparam>
  248. /// <typeparam name="TAccumulate">The type of the accumulator value.</typeparam>
  249. /// <typeparam name="TResult">The type of the resulting value.</typeparam>
  250. /// <param name="source">An async-enumerable sequence to aggregate over.</param>
  251. /// <param name="seed">The initial accumulator value.</param>
  252. /// <param name="accumulator">An asynchronous accumulator function to be invoked and awaited on each element.</param>
  253. /// <param name="resultSelector">An asynchronous transform function to transform the final accumulator value into the result value.</param>
  254. /// <param name="cancellationToken">An optional cancellation token to be used for cancelling the sequence at any time.</param>
  255. /// <returns>A ValueTask containing the value obtained by applying the result selector to the final accumulator value.</returns>
  256. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> or <paramref name="resultSelector"/> is <see langword="null"/>.</exception>
  257. [GenerateAsyncOverload]
  258. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitAsync functionality now exists as overloads of AggregateAsync.")]
  259. private static ValueTask<TResult> AggregateAwaitAsyncCore<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  260. {
  261. if (source == null)
  262. throw Error.ArgumentNull(nameof(source));
  263. if (accumulator == null)
  264. throw Error.ArgumentNull(nameof(accumulator));
  265. if (resultSelector == null)
  266. throw Error.ArgumentNull(nameof(resultSelector));
  267. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  268. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  269. {
  270. var acc = seed;
  271. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  272. {
  273. acc = await accumulator(acc, item).ConfigureAwait(false);
  274. }
  275. return await resultSelector(acc).ConfigureAwait(false);
  276. }
  277. }
  278. #if !NO_DEEP_CANCELLATION
  279. [GenerateAsyncOverload]
  280. [Obsolete("Use AggregateAsync. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the AggregateAwaitWithCancellationAsync functionality now exists as overloads of AggregateAsync.")]
  281. private static ValueTask<TResult> AggregateAwaitWithCancellationAsyncCore<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  282. {
  283. if (source == null)
  284. throw Error.ArgumentNull(nameof(source));
  285. if (accumulator == null)
  286. throw Error.ArgumentNull(nameof(accumulator));
  287. if (resultSelector == null)
  288. throw Error.ArgumentNull(nameof(resultSelector));
  289. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  290. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  291. {
  292. var acc = seed;
  293. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  294. {
  295. acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
  296. }
  297. return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
  298. }
  299. }
  300. #endif
  301. }
  302. }