Aggregate.cs 11 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static ValueTask<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken = default)
  12. {
  13. if (source == null)
  14. throw Error.ArgumentNull(nameof(source));
  15. if (accumulator == null)
  16. throw Error.ArgumentNull(nameof(accumulator));
  17. return Core(source, accumulator, cancellationToken);
  18. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  19. {
  20. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  21. if (!await e.MoveNextAsync())
  22. {
  23. throw Error.NoElements();
  24. }
  25. var acc = e.Current;
  26. while (await e.MoveNextAsync())
  27. {
  28. acc = accumulator(acc, e.Current);
  29. }
  30. return acc;
  31. }
  32. }
  33. internal static ValueTask<TSource> AggregateAwaitAsyncCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  34. {
  35. if (source == null)
  36. throw Error.ArgumentNull(nameof(source));
  37. if (accumulator == null)
  38. throw Error.ArgumentNull(nameof(accumulator));
  39. return Core(source, accumulator, cancellationToken);
  40. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  41. {
  42. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  43. if (!await e.MoveNextAsync())
  44. {
  45. throw Error.NoElements();
  46. }
  47. var acc = e.Current;
  48. while (await e.MoveNextAsync())
  49. {
  50. acc = await accumulator(acc, e.Current).ConfigureAwait(false);
  51. }
  52. return acc;
  53. }
  54. }
  55. #if !NO_DEEP_CANCELLATION
  56. internal static ValueTask<TSource> AggregateAwaitWithCancellationAsyncCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  57. {
  58. if (source == null)
  59. throw Error.ArgumentNull(nameof(source));
  60. if (accumulator == null)
  61. throw Error.ArgumentNull(nameof(accumulator));
  62. return Core(source, accumulator, cancellationToken);
  63. static async ValueTask<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  64. {
  65. await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  66. if (!await e.MoveNextAsync())
  67. {
  68. throw Error.NoElements();
  69. }
  70. var acc = e.Current;
  71. while (await e.MoveNextAsync())
  72. {
  73. acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
  74. }
  75. return acc;
  76. }
  77. }
  78. #endif
  79. public static ValueTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken = default)
  80. {
  81. if (source == null)
  82. throw Error.ArgumentNull(nameof(source));
  83. if (accumulator == null)
  84. throw Error.ArgumentNull(nameof(accumulator));
  85. return Core(source, seed, accumulator, cancellationToken);
  86. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken)
  87. {
  88. var acc = seed;
  89. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  90. {
  91. acc = accumulator(acc, item);
  92. }
  93. return acc;
  94. }
  95. }
  96. internal static ValueTask<TAccumulate> AggregateAwaitAsyncCore<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  97. {
  98. if (source == null)
  99. throw Error.ArgumentNull(nameof(source));
  100. if (accumulator == null)
  101. throw Error.ArgumentNull(nameof(accumulator));
  102. return Core(source, seed, accumulator, cancellationToken);
  103. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  104. {
  105. var acc = seed;
  106. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  107. {
  108. acc = await accumulator(acc, item).ConfigureAwait(false);
  109. }
  110. return acc;
  111. }
  112. }
  113. #if !NO_DEEP_CANCELLATION
  114. internal static ValueTask<TAccumulate> AggregateAwaitWithCancellationAsyncCore<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  115. {
  116. if (source == null)
  117. throw Error.ArgumentNull(nameof(source));
  118. if (accumulator == null)
  119. throw Error.ArgumentNull(nameof(accumulator));
  120. return Core(source, seed, accumulator, cancellationToken);
  121. static async ValueTask<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  122. {
  123. var acc = seed;
  124. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  125. {
  126. acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
  127. }
  128. return acc;
  129. }
  130. }
  131. #endif
  132. public static ValueTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default)
  133. {
  134. if (source == null)
  135. throw Error.ArgumentNull(nameof(source));
  136. if (accumulator == null)
  137. throw Error.ArgumentNull(nameof(accumulator));
  138. if (resultSelector == null)
  139. throw Error.ArgumentNull(nameof(resultSelector));
  140. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  141. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  142. {
  143. var acc = seed;
  144. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  145. {
  146. acc = accumulator(acc, item);
  147. }
  148. return resultSelector(acc);
  149. }
  150. }
  151. internal static ValueTask<TResult> AggregateAwaitAsyncCore<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  152. {
  153. if (source == null)
  154. throw Error.ArgumentNull(nameof(source));
  155. if (accumulator == null)
  156. throw Error.ArgumentNull(nameof(accumulator));
  157. if (resultSelector == null)
  158. throw Error.ArgumentNull(nameof(resultSelector));
  159. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  160. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  161. {
  162. var acc = seed;
  163. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  164. {
  165. acc = await accumulator(acc, item).ConfigureAwait(false);
  166. }
  167. return await resultSelector(acc).ConfigureAwait(false);
  168. }
  169. }
  170. #if !NO_DEEP_CANCELLATION
  171. internal static ValueTask<TResult> AggregateAwaitWithCancellationAsyncCore<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  172. {
  173. if (source == null)
  174. throw Error.ArgumentNull(nameof(source));
  175. if (accumulator == null)
  176. throw Error.ArgumentNull(nameof(accumulator));
  177. if (resultSelector == null)
  178. throw Error.ArgumentNull(nameof(resultSelector));
  179. return Core(source, seed, accumulator, resultSelector, cancellationToken);
  180. static async ValueTask<TResult> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  181. {
  182. var acc = seed;
  183. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  184. {
  185. acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
  186. }
  187. return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
  188. }
  189. }
  190. #endif
  191. }
  192. }