Aggregate.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  12. {
  13. if (source == null)
  14. throw Error.ArgumentNull(nameof(source));
  15. if (accumulator == null)
  16. throw Error.ArgumentNull(nameof(accumulator));
  17. return AggregateCore(source, accumulator, CancellationToken.None);
  18. }
  19. public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  20. {
  21. if (source == null)
  22. throw Error.ArgumentNull(nameof(source));
  23. if (accumulator == null)
  24. throw Error.ArgumentNull(nameof(accumulator));
  25. return AggregateCore(source, accumulator, cancellationToken);
  26. }
  27. public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator)
  28. {
  29. if (source == null)
  30. throw Error.ArgumentNull(nameof(source));
  31. if (accumulator == null)
  32. throw Error.ArgumentNull(nameof(accumulator));
  33. return AggregateCore(source, accumulator, CancellationToken.None);
  34. }
  35. public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  36. {
  37. if (source == null)
  38. throw Error.ArgumentNull(nameof(source));
  39. if (accumulator == null)
  40. throw Error.ArgumentNull(nameof(accumulator));
  41. return AggregateCore(source, accumulator, cancellationToken);
  42. }
  43. #if !NO_DEEP_CANCELLATION
  44. public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  45. {
  46. if (source == null)
  47. throw Error.ArgumentNull(nameof(source));
  48. if (accumulator == null)
  49. throw Error.ArgumentNull(nameof(accumulator));
  50. return AggregateCore(source, accumulator, cancellationToken);
  51. }
  52. #endif
  53. public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  54. {
  55. if (source == null)
  56. throw Error.ArgumentNull(nameof(source));
  57. if (accumulator == null)
  58. throw Error.ArgumentNull(nameof(accumulator));
  59. return AggregateCore(source, seed, accumulator, x => x, CancellationToken.None);
  60. }
  61. public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken)
  62. {
  63. if (source == null)
  64. throw Error.ArgumentNull(nameof(source));
  65. if (accumulator == null)
  66. throw Error.ArgumentNull(nameof(accumulator));
  67. return AggregateCore(source, seed, accumulator, x => x, cancellationToken);
  68. }
  69. public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator)
  70. {
  71. if (source == null)
  72. throw Error.ArgumentNull(nameof(source));
  73. if (accumulator == null)
  74. throw Error.ArgumentNull(nameof(accumulator));
  75. return AggregateCore(source, seed, accumulator, CancellationToken.None);
  76. }
  77. public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  78. {
  79. if (source == null)
  80. throw Error.ArgumentNull(nameof(source));
  81. if (accumulator == null)
  82. throw Error.ArgumentNull(nameof(accumulator));
  83. return AggregateCore(source, seed, accumulator, cancellationToken);
  84. }
  85. #if !NO_DEEP_CANCELLATION
  86. public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  87. {
  88. if (source == null)
  89. throw Error.ArgumentNull(nameof(source));
  90. if (accumulator == null)
  91. throw Error.ArgumentNull(nameof(accumulator));
  92. return AggregateCore(source, seed, accumulator, cancellationToken);
  93. }
  94. #endif
  95. public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
  96. {
  97. if (source == null)
  98. throw Error.ArgumentNull(nameof(source));
  99. if (accumulator == null)
  100. throw Error.ArgumentNull(nameof(accumulator));
  101. if (resultSelector == null)
  102. throw Error.ArgumentNull(nameof(resultSelector));
  103. return AggregateCore(source, seed, accumulator, resultSelector, CancellationToken.None);
  104. }
  105. public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  106. {
  107. if (source == null)
  108. throw Error.ArgumentNull(nameof(source));
  109. if (accumulator == null)
  110. throw Error.ArgumentNull(nameof(accumulator));
  111. if (resultSelector == null)
  112. throw Error.ArgumentNull(nameof(resultSelector));
  113. return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
  114. }
  115. public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector)
  116. {
  117. if (source == null)
  118. throw Error.ArgumentNull(nameof(source));
  119. if (accumulator == null)
  120. throw Error.ArgumentNull(nameof(accumulator));
  121. if (resultSelector == null)
  122. throw Error.ArgumentNull(nameof(resultSelector));
  123. return AggregateCore(source, seed, accumulator, resultSelector, CancellationToken.None);
  124. }
  125. public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  126. {
  127. if (source == null)
  128. throw Error.ArgumentNull(nameof(source));
  129. if (accumulator == null)
  130. throw Error.ArgumentNull(nameof(accumulator));
  131. if (resultSelector == null)
  132. throw Error.ArgumentNull(nameof(resultSelector));
  133. return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
  134. }
  135. #if !NO_DEEP_CANCELLATION
  136. public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  137. {
  138. if (source == null)
  139. throw Error.ArgumentNull(nameof(source));
  140. if (accumulator == null)
  141. throw Error.ArgumentNull(nameof(accumulator));
  142. if (resultSelector == null)
  143. throw Error.ArgumentNull(nameof(resultSelector));
  144. return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
  145. }
  146. #endif
  147. private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  148. {
  149. var acc = seed;
  150. var e = source.GetAsyncEnumerator(cancellationToken);
  151. try
  152. {
  153. while (await e.MoveNextAsync().ConfigureAwait(false))
  154. {
  155. acc = accumulator(acc, e.Current);
  156. }
  157. }
  158. finally
  159. {
  160. await e.DisposeAsync().ConfigureAwait(false);
  161. }
  162. return resultSelector(acc);
  163. }
  164. private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  165. {
  166. var e = source.GetAsyncEnumerator(cancellationToken);
  167. try
  168. {
  169. if (!await e.MoveNextAsync().ConfigureAwait(false))
  170. {
  171. throw Error.NoElements();
  172. }
  173. var acc = e.Current;
  174. while (await e.MoveNextAsync().ConfigureAwait(false))
  175. {
  176. acc = accumulator(acc, e.Current);
  177. }
  178. return acc;
  179. }
  180. finally
  181. {
  182. await e.DisposeAsync().ConfigureAwait(false);
  183. }
  184. }
  185. private static async Task<TResult> AggregateCore<TSource, TResult>(IAsyncEnumerable<TSource> source, TResult seed, Func<TResult, TSource, ValueTask<TResult>> accumulator, CancellationToken cancellationToken)
  186. {
  187. var acc = seed;
  188. var e = source.GetAsyncEnumerator(cancellationToken);
  189. try
  190. {
  191. while (await e.MoveNextAsync().ConfigureAwait(false))
  192. {
  193. acc = await accumulator(acc, e.Current).ConfigureAwait(false);
  194. }
  195. }
  196. finally
  197. {
  198. await e.DisposeAsync().ConfigureAwait(false);
  199. }
  200. return acc;
  201. }
  202. #if !NO_DEEP_CANCELLATION
  203. private static async Task<TResult> AggregateCore<TSource, TResult>(IAsyncEnumerable<TSource> source, TResult seed, Func<TResult, TSource, CancellationToken, ValueTask<TResult>> accumulator, CancellationToken cancellationToken)
  204. {
  205. var acc = seed;
  206. var e = source.GetAsyncEnumerator(cancellationToken);
  207. try
  208. {
  209. while (await e.MoveNextAsync().ConfigureAwait(false))
  210. {
  211. acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
  212. }
  213. }
  214. finally
  215. {
  216. await e.DisposeAsync().ConfigureAwait(false);
  217. }
  218. return acc;
  219. }
  220. #endif
  221. private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  222. {
  223. var acc = seed;
  224. var e = source.GetAsyncEnumerator(cancellationToken);
  225. try
  226. {
  227. while (await e.MoveNextAsync().ConfigureAwait(false))
  228. {
  229. acc = await accumulator(acc, e.Current).ConfigureAwait(false);
  230. }
  231. }
  232. finally
  233. {
  234. await e.DisposeAsync().ConfigureAwait(false);
  235. }
  236. return await resultSelector(acc).ConfigureAwait(false);
  237. }
  238. #if !NO_DEEP_CANCELLATION
  239. private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
  240. {
  241. var acc = seed;
  242. var e = source.GetAsyncEnumerator(cancellationToken);
  243. try
  244. {
  245. while (await e.MoveNextAsync().ConfigureAwait(false))
  246. {
  247. acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
  248. }
  249. }
  250. finally
  251. {
  252. await e.DisposeAsync().ConfigureAwait(false);
  253. }
  254. return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
  255. }
  256. #endif
  257. private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  258. {
  259. var e = source.GetAsyncEnumerator(cancellationToken);
  260. try
  261. {
  262. if (!await e.MoveNextAsync().ConfigureAwait(false))
  263. {
  264. throw Error.NoElements();
  265. }
  266. var acc = e.Current;
  267. while (await e.MoveNextAsync().ConfigureAwait(false))
  268. {
  269. acc = await accumulator(acc, e.Current).ConfigureAwait(false);
  270. }
  271. return acc;
  272. }
  273. finally
  274. {
  275. await e.DisposeAsync().ConfigureAwait(false);
  276. }
  277. }
  278. #if !NO_DEEP_CANCELLATION
  279. private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
  280. {
  281. var e = source.GetAsyncEnumerator(cancellationToken);
  282. try
  283. {
  284. if (!await e.MoveNextAsync().ConfigureAwait(false))
  285. {
  286. throw Error.NoElements();
  287. }
  288. var acc = e.Current;
  289. while (await e.MoveNextAsync().ConfigureAwait(false))
  290. {
  291. acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
  292. }
  293. return acc;
  294. }
  295. finally
  296. {
  297. await e.DisposeAsync().ConfigureAwait(false);
  298. }
  299. }
  300. #endif
  301. }
  302. }