Aggregate.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading.Tasks;
  5. namespace System.Reactive.Linq
  6. {
  7. public partial class AsyncObservable
  8. {
  9. public static IAsyncObservable<TSource> Aggregate<TSource>(this IAsyncObservable<TSource> source, Func<TSource, TSource, TSource> func)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. if (func == null)
  14. throw new ArgumentNullException(nameof(func));
  15. return CreateAsyncObservable<TSource>.From(
  16. source,
  17. func,
  18. static (source, func, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, func)));
  19. }
  20. public static IAsyncObservable<TSource> Aggregate<TSource>(this IAsyncObservable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> func)
  21. {
  22. if (source == null)
  23. throw new ArgumentNullException(nameof(source));
  24. if (func == null)
  25. throw new ArgumentNullException(nameof(func));
  26. return CreateAsyncObservable<TSource>.From(
  27. source,
  28. func,
  29. static (source, func, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, func)));
  30. }
  31. public static IAsyncObservable<TResult> Aggregate<TSource, TResult>(this IAsyncObservable<TSource> source, TResult seed, Func<TResult, TSource, TResult> func)
  32. {
  33. if (source == null)
  34. throw new ArgumentNullException(nameof(source));
  35. if (func == null)
  36. throw new ArgumentNullException(nameof(func));
  37. return CreateAsyncObservable<TResult>.From(
  38. source,
  39. (seed, func),
  40. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, state.seed, state.func)));
  41. }
  42. public static IAsyncObservable<TResult> Aggregate<TSource, TResult>(this IAsyncObservable<TSource> source, TResult seed, Func<TResult, TSource, ValueTask<TResult>> func)
  43. {
  44. if (source == null)
  45. throw new ArgumentNullException(nameof(source));
  46. if (func == null)
  47. throw new ArgumentNullException(nameof(func));
  48. return CreateAsyncObservable<TResult>.From(
  49. source,
  50. (seed, func),
  51. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, state.seed, state.func)));
  52. }
  53. public static IAsyncObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector)
  54. {
  55. if (source == null)
  56. throw new ArgumentNullException(nameof(source));
  57. if (resultSelector == null)
  58. throw new ArgumentNullException(nameof(resultSelector));
  59. if (func == null)
  60. throw new ArgumentNullException(nameof(func));
  61. return CreateAsyncObservable<TResult>.From(
  62. source,
  63. (seed, func, resultSelector),
  64. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, state.seed, state.func, state.resultSelector)));
  65. }
  66. public static IAsyncObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> func, Func<TAccumulate, ValueTask<TResult>> resultSelector)
  67. {
  68. if (source == null)
  69. throw new ArgumentNullException(nameof(source));
  70. if (resultSelector == null)
  71. throw new ArgumentNullException(nameof(resultSelector));
  72. if (func == null)
  73. throw new ArgumentNullException(nameof(func));
  74. return CreateAsyncObservable<TResult>.From(
  75. source,
  76. (seed, func, resultSelector),
  77. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, state.seed, state.func, state.resultSelector)));
  78. }
  79. }
  80. public partial class AsyncObserver
  81. {
  82. public static IAsyncObserver<TSource> Aggregate<TSource>(IAsyncObserver<TSource> observer, Func<TSource, TSource, TSource> func)
  83. {
  84. if (observer == null)
  85. throw new ArgumentNullException(nameof(observer));
  86. if (func == null)
  87. throw new ArgumentNullException(nameof(func));
  88. var hasValue = false;
  89. var value = default(TSource);
  90. return Create<TSource>(
  91. async x =>
  92. {
  93. if (hasValue)
  94. {
  95. try
  96. {
  97. value = func(value, x);
  98. }
  99. catch (Exception ex)
  100. {
  101. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  102. return;
  103. }
  104. }
  105. else
  106. {
  107. value = x;
  108. hasValue = true;
  109. }
  110. },
  111. observer.OnErrorAsync,
  112. async () =>
  113. {
  114. if (!hasValue)
  115. {
  116. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  117. }
  118. else
  119. {
  120. await observer.OnNextAsync(value).ConfigureAwait(false);
  121. await observer.OnCompletedAsync().ConfigureAwait(false);
  122. }
  123. }
  124. );
  125. }
  126. public static IAsyncObserver<TSource> Aggregate<TSource>(IAsyncObserver<TSource> observer, Func<TSource, TSource, ValueTask<TSource>> func)
  127. {
  128. if (observer == null)
  129. throw new ArgumentNullException(nameof(observer));
  130. if (func == null)
  131. throw new ArgumentNullException(nameof(func));
  132. var hasValue = false;
  133. var value = default(TSource);
  134. return Create<TSource>(
  135. async x =>
  136. {
  137. if (hasValue)
  138. {
  139. try
  140. {
  141. value = await func(value, x).ConfigureAwait(false);
  142. }
  143. catch (Exception ex)
  144. {
  145. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  146. return;
  147. }
  148. }
  149. else
  150. {
  151. value = x;
  152. hasValue = true;
  153. }
  154. },
  155. observer.OnErrorAsync,
  156. async () =>
  157. {
  158. if (!hasValue)
  159. {
  160. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  161. }
  162. else
  163. {
  164. await observer.OnNextAsync(value).ConfigureAwait(false);
  165. await observer.OnCompletedAsync().ConfigureAwait(false);
  166. }
  167. }
  168. );
  169. }
  170. public static IAsyncObserver<TSource> Aggregate<TSource, TResult>(IAsyncObserver<TResult> observer, TResult seed, Func<TResult, TSource, TResult> func)
  171. {
  172. if (observer == null)
  173. throw new ArgumentNullException(nameof(observer));
  174. if (func == null)
  175. throw new ArgumentNullException(nameof(func));
  176. return Aggregate(observer, seed, func, a => a);
  177. }
  178. public static IAsyncObserver<TSource> Aggregate<TSource, TResult>(IAsyncObserver<TResult> observer, TResult seed, Func<TResult, TSource, ValueTask<TResult>> func)
  179. {
  180. if (observer == null)
  181. throw new ArgumentNullException(nameof(observer));
  182. if (func == null)
  183. throw new ArgumentNullException(nameof(func));
  184. return Aggregate<TSource, TResult, TResult>(observer, seed, (a, x) => func(a, x), a => new ValueTask<TResult>(a));
  185. }
  186. public static IAsyncObserver<TSource> Aggregate<TSource, TAccumulate, TResult>(IAsyncObserver<TResult> observer, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector)
  187. {
  188. if (observer == null)
  189. throw new ArgumentNullException(nameof(observer));
  190. if (resultSelector == null)
  191. throw new ArgumentNullException(nameof(resultSelector));
  192. if (func == null)
  193. throw new ArgumentNullException(nameof(func));
  194. var value = seed;
  195. return Create<TSource>(
  196. async x =>
  197. {
  198. try
  199. {
  200. value = func(value, x);
  201. }
  202. catch (Exception ex)
  203. {
  204. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  205. return;
  206. }
  207. },
  208. observer.OnErrorAsync,
  209. async () =>
  210. {
  211. var res = default(TResult);
  212. try
  213. {
  214. res = resultSelector(value);
  215. }
  216. catch (Exception ex)
  217. {
  218. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  219. return;
  220. }
  221. await observer.OnNextAsync(res).ConfigureAwait(false);
  222. await observer.OnCompletedAsync().ConfigureAwait(false);
  223. }
  224. );
  225. }
  226. public static IAsyncObserver<TSource> Aggregate<TSource, TAccumulate, TResult>(IAsyncObserver<TResult> observer, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> func, Func<TAccumulate, ValueTask<TResult>> resultSelector)
  227. {
  228. if (observer == null)
  229. throw new ArgumentNullException(nameof(observer));
  230. if (resultSelector == null)
  231. throw new ArgumentNullException(nameof(resultSelector));
  232. if (func == null)
  233. throw new ArgumentNullException(nameof(func));
  234. var value = seed;
  235. return Create<TSource>(
  236. async x =>
  237. {
  238. try
  239. {
  240. value = await func(value, x).ConfigureAwait(false);
  241. }
  242. catch (Exception ex)
  243. {
  244. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  245. return;
  246. }
  247. },
  248. observer.OnErrorAsync,
  249. async () =>
  250. {
  251. var res = default(TResult);
  252. try
  253. {
  254. res = await resultSelector(value).ConfigureAwait(false);
  255. }
  256. catch (Exception ex)
  257. {
  258. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  259. return;
  260. }
  261. await observer.OnNextAsync(res).ConfigureAwait(false);
  262. await observer.OnCompletedAsync().ConfigureAwait(false);
  263. }
  264. );
  265. }
  266. }
  267. }