Aggregate.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading.Tasks;
  5. namespace System.Reactive.Linq
  6. {
  7. public partial class AsyncObservable
  8. {
  9. public static IAsyncObservable<TSource> Aggregate<TSource>(this IAsyncObservable<TSource> source, Func<TSource, TSource, TSource> func)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. if (func == null)
  14. throw new ArgumentNullException(nameof(func));
  15. return Create(
  16. source,
  17. func,
  18. default(TSource),
  19. (source, func, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, func)));
  20. }
  21. public static IAsyncObservable<TSource> Aggregate<TSource>(this IAsyncObservable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> func)
  22. {
  23. if (source == null)
  24. throw new ArgumentNullException(nameof(source));
  25. if (func == null)
  26. throw new ArgumentNullException(nameof(func));
  27. return Create(
  28. source,
  29. func,
  30. default(TSource),
  31. (source, func, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, func)));
  32. }
  33. public static IAsyncObservable<TResult> Aggregate<TSource, TResult>(this IAsyncObservable<TSource> source, TResult seed, Func<TResult, TSource, TResult> func)
  34. {
  35. if (source == null)
  36. throw new ArgumentNullException(nameof(source));
  37. if (func == null)
  38. throw new ArgumentNullException(nameof(func));
  39. return Create(
  40. source,
  41. (seed, func),
  42. default(TResult),
  43. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, state.seed, state.func)));
  44. }
  45. public static IAsyncObservable<TResult> Aggregate<TSource, TResult>(this IAsyncObservable<TSource> source, TResult seed, Func<TResult, TSource, ValueTask<TResult>> func)
  46. {
  47. if (source == null)
  48. throw new ArgumentNullException(nameof(source));
  49. if (func == null)
  50. throw new ArgumentNullException(nameof(func));
  51. return Create(
  52. source,
  53. (seed, func),
  54. default(TResult),
  55. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, state.seed, state.func)));
  56. }
  57. public static IAsyncObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector)
  58. {
  59. if (source == null)
  60. throw new ArgumentNullException(nameof(source));
  61. if (resultSelector == null)
  62. throw new ArgumentNullException(nameof(resultSelector));
  63. if (func == null)
  64. throw new ArgumentNullException(nameof(func));
  65. return Create(
  66. source,
  67. (seed, func, resultSelector),
  68. default(TResult),
  69. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, state.seed, state.func, state.resultSelector)));
  70. }
  71. public static IAsyncObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> func, Func<TAccumulate, ValueTask<TResult>> resultSelector)
  72. {
  73. if (source == null)
  74. throw new ArgumentNullException(nameof(source));
  75. if (resultSelector == null)
  76. throw new ArgumentNullException(nameof(resultSelector));
  77. if (func == null)
  78. throw new ArgumentNullException(nameof(func));
  79. return Create(
  80. source,
  81. (seed, func, resultSelector),
  82. default(TResult),
  83. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, state.seed, state.func, state.resultSelector)));
  84. }
  85. }
  86. public partial class AsyncObserver
  87. {
  88. public static IAsyncObserver<TSource> Aggregate<TSource>(IAsyncObserver<TSource> observer, Func<TSource, TSource, TSource> func)
  89. {
  90. if (observer == null)
  91. throw new ArgumentNullException(nameof(observer));
  92. if (func == null)
  93. throw new ArgumentNullException(nameof(func));
  94. var hasValue = false;
  95. var value = default(TSource);
  96. return Create<TSource>(
  97. async x =>
  98. {
  99. if (hasValue)
  100. {
  101. try
  102. {
  103. value = func(value, x);
  104. }
  105. catch (Exception ex)
  106. {
  107. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  108. return;
  109. }
  110. }
  111. else
  112. {
  113. value = x;
  114. hasValue = true;
  115. }
  116. },
  117. observer.OnErrorAsync,
  118. async () =>
  119. {
  120. if (!hasValue)
  121. {
  122. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  123. }
  124. else
  125. {
  126. await observer.OnNextAsync(value).ConfigureAwait(false);
  127. await observer.OnCompletedAsync().ConfigureAwait(false);
  128. }
  129. }
  130. );
  131. }
  132. public static IAsyncObserver<TSource> Aggregate<TSource>(IAsyncObserver<TSource> observer, Func<TSource, TSource, ValueTask<TSource>> func)
  133. {
  134. if (observer == null)
  135. throw new ArgumentNullException(nameof(observer));
  136. if (func == null)
  137. throw new ArgumentNullException(nameof(func));
  138. var hasValue = false;
  139. var value = default(TSource);
  140. return Create<TSource>(
  141. async x =>
  142. {
  143. if (hasValue)
  144. {
  145. try
  146. {
  147. value = await func(value, x).ConfigureAwait(false);
  148. }
  149. catch (Exception ex)
  150. {
  151. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  152. return;
  153. }
  154. }
  155. else
  156. {
  157. value = x;
  158. hasValue = true;
  159. }
  160. },
  161. observer.OnErrorAsync,
  162. async () =>
  163. {
  164. if (!hasValue)
  165. {
  166. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  167. }
  168. else
  169. {
  170. await observer.OnNextAsync(value).ConfigureAwait(false);
  171. await observer.OnCompletedAsync().ConfigureAwait(false);
  172. }
  173. }
  174. );
  175. }
  176. public static IAsyncObserver<TSource> Aggregate<TSource, TResult>(IAsyncObserver<TResult> observer, TResult seed, Func<TResult, TSource, TResult> func)
  177. {
  178. if (observer == null)
  179. throw new ArgumentNullException(nameof(observer));
  180. if (func == null)
  181. throw new ArgumentNullException(nameof(func));
  182. return Aggregate(observer, seed, func, a => a);
  183. }
  184. public static IAsyncObserver<TSource> Aggregate<TSource, TResult>(IAsyncObserver<TResult> observer, TResult seed, Func<TResult, TSource, ValueTask<TResult>> func)
  185. {
  186. if (observer == null)
  187. throw new ArgumentNullException(nameof(observer));
  188. if (func == null)
  189. throw new ArgumentNullException(nameof(func));
  190. return Aggregate<TSource, TResult, TResult>(observer, seed, (a, x) => func(a, x), a => new ValueTask<TResult>(a));
  191. }
  192. public static IAsyncObserver<TSource> Aggregate<TSource, TAccumulate, TResult>(IAsyncObserver<TResult> observer, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector)
  193. {
  194. if (observer == null)
  195. throw new ArgumentNullException(nameof(observer));
  196. if (resultSelector == null)
  197. throw new ArgumentNullException(nameof(resultSelector));
  198. if (func == null)
  199. throw new ArgumentNullException(nameof(func));
  200. var value = seed;
  201. return Create<TSource>(
  202. async x =>
  203. {
  204. try
  205. {
  206. value = func(value, x);
  207. }
  208. catch (Exception ex)
  209. {
  210. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  211. return;
  212. }
  213. },
  214. observer.OnErrorAsync,
  215. async () =>
  216. {
  217. var res = default(TResult);
  218. try
  219. {
  220. res = resultSelector(value);
  221. }
  222. catch (Exception ex)
  223. {
  224. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  225. return;
  226. }
  227. await observer.OnNextAsync(res).ConfigureAwait(false);
  228. await observer.OnCompletedAsync().ConfigureAwait(false);
  229. }
  230. );
  231. }
  232. public static IAsyncObserver<TSource> Aggregate<TSource, TAccumulate, TResult>(IAsyncObserver<TResult> observer, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> func, Func<TAccumulate, ValueTask<TResult>> resultSelector)
  233. {
  234. if (observer == null)
  235. throw new ArgumentNullException(nameof(observer));
  236. if (resultSelector == null)
  237. throw new ArgumentNullException(nameof(resultSelector));
  238. if (func == null)
  239. throw new ArgumentNullException(nameof(func));
  240. var value = seed;
  241. return Create<TSource>(
  242. async x =>
  243. {
  244. try
  245. {
  246. value = await func(value, x).ConfigureAwait(false);
  247. }
  248. catch (Exception ex)
  249. {
  250. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  251. return;
  252. }
  253. },
  254. observer.OnErrorAsync,
  255. async () =>
  256. {
  257. var res = default(TResult);
  258. try
  259. {
  260. res = await resultSelector(value).ConfigureAwait(false);
  261. }
  262. catch (Exception ex)
  263. {
  264. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  265. return;
  266. }
  267. await observer.OnNextAsync(res).ConfigureAwait(false);
  268. await observer.OnCompletedAsync().ConfigureAwait(false);
  269. }
  270. );
  271. }
  272. }
  273. }