Aggregate.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading.Tasks;
  5. namespace System.Reactive.Linq
  6. {
  7. partial class AsyncObservable
  8. {
  9. public static IAsyncObservable<TSource> Aggregate<TSource>(this IAsyncObservable<TSource> source, Func<TSource, TSource, TSource> func)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. if (func == null)
  14. throw new ArgumentNullException(nameof(func));
  15. return Create<TSource>(observer => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, func)));
  16. }
  17. public static IAsyncObservable<TSource> Aggregate<TSource>(this IAsyncObservable<TSource> source, Func<TSource, TSource, Task<TSource>> func)
  18. {
  19. if (source == null)
  20. throw new ArgumentNullException(nameof(source));
  21. if (func == null)
  22. throw new ArgumentNullException(nameof(func));
  23. return Create<TSource>(observer => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, func)));
  24. }
  25. public static IAsyncObservable<TResult> Aggregate<TSource, TResult>(this IAsyncObservable<TSource> source, TResult seed, Func<TResult, TSource, TResult> func)
  26. {
  27. if (source == null)
  28. throw new ArgumentNullException(nameof(source));
  29. if (func == null)
  30. throw new ArgumentNullException(nameof(func));
  31. return Create<TResult>(observer => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, seed, func)));
  32. }
  33. public static IAsyncObservable<TResult> Aggregate<TSource, TResult>(this IAsyncObservable<TSource> source, TResult seed, Func<TResult, TSource, Task<TResult>> func)
  34. {
  35. if (source == null)
  36. throw new ArgumentNullException(nameof(source));
  37. if (func == null)
  38. throw new ArgumentNullException(nameof(func));
  39. return Create<TResult>(observer => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, seed, func)));
  40. }
  41. public static IAsyncObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector)
  42. {
  43. if (source == null)
  44. throw new ArgumentNullException(nameof(source));
  45. if (resultSelector == null)
  46. throw new ArgumentNullException(nameof(resultSelector));
  47. if (func == null)
  48. throw new ArgumentNullException(nameof(func));
  49. return Create<TResult>(observer => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, seed, func, resultSelector)));
  50. }
  51. public static IAsyncObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> func, Func<TAccumulate, Task<TResult>> resultSelector)
  52. {
  53. if (source == null)
  54. throw new ArgumentNullException(nameof(source));
  55. if (resultSelector == null)
  56. throw new ArgumentNullException(nameof(resultSelector));
  57. if (func == null)
  58. throw new ArgumentNullException(nameof(func));
  59. return Create<TResult>(observer => source.SubscribeSafeAsync(AsyncObserver.Aggregate(observer, seed, func, resultSelector)));
  60. }
  61. }
  62. partial class AsyncObserver
  63. {
  64. public static IAsyncObserver<TSource> Aggregate<TSource>(IAsyncObserver<TSource> observer, Func<TSource, TSource, TSource> func)
  65. {
  66. if (observer == null)
  67. throw new ArgumentNullException(nameof(observer));
  68. if (func == null)
  69. throw new ArgumentNullException(nameof(func));
  70. var hasValue = false;
  71. var value = default(TSource);
  72. return Create<TSource>(
  73. async x =>
  74. {
  75. if (hasValue)
  76. {
  77. try
  78. {
  79. value = func(value, x);
  80. }
  81. catch (Exception ex)
  82. {
  83. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  84. return;
  85. }
  86. }
  87. else
  88. {
  89. value = x;
  90. hasValue = true;
  91. }
  92. },
  93. observer.OnErrorAsync,
  94. async () =>
  95. {
  96. if (!hasValue)
  97. {
  98. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  99. }
  100. else
  101. {
  102. await observer.OnNextAsync(value).ConfigureAwait(false);
  103. await observer.OnCompletedAsync().ConfigureAwait(false);
  104. }
  105. }
  106. );
  107. }
  108. public static IAsyncObserver<TSource> Aggregate<TSource>(IAsyncObserver<TSource> observer, Func<TSource, TSource, Task<TSource>> func)
  109. {
  110. if (observer == null)
  111. throw new ArgumentNullException(nameof(observer));
  112. if (func == null)
  113. throw new ArgumentNullException(nameof(func));
  114. var hasValue = false;
  115. var value = default(TSource);
  116. return Create<TSource>(
  117. async x =>
  118. {
  119. if (hasValue)
  120. {
  121. try
  122. {
  123. value = await func(value, x).ConfigureAwait(false);
  124. }
  125. catch (Exception ex)
  126. {
  127. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  128. return;
  129. }
  130. }
  131. else
  132. {
  133. value = x;
  134. hasValue = true;
  135. }
  136. },
  137. observer.OnErrorAsync,
  138. async () =>
  139. {
  140. if (!hasValue)
  141. {
  142. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  143. }
  144. else
  145. {
  146. await observer.OnNextAsync(value).ConfigureAwait(false);
  147. await observer.OnCompletedAsync().ConfigureAwait(false);
  148. }
  149. }
  150. );
  151. }
  152. public static IAsyncObserver<TSource> Aggregate<TSource, TResult>(IAsyncObserver<TResult> observer, TResult seed, Func<TResult, TSource, TResult> func)
  153. {
  154. if (observer == null)
  155. throw new ArgumentNullException(nameof(observer));
  156. if (func == null)
  157. throw new ArgumentNullException(nameof(func));
  158. return Aggregate(observer, seed, func, a => a);
  159. }
  160. public static IAsyncObserver<TSource> Aggregate<TSource, TResult>(IAsyncObserver<TResult> observer, TResult seed, Func<TResult, TSource, Task<TResult>> func)
  161. {
  162. if (observer == null)
  163. throw new ArgumentNullException(nameof(observer));
  164. if (func == null)
  165. throw new ArgumentNullException(nameof(func));
  166. return Aggregate<TSource, TResult, TResult>(observer, seed, (a, x) => func(a, x), a => Task.FromResult(a));
  167. }
  168. public static IAsyncObserver<TSource> Aggregate<TSource, TAccumulate, TResult>(IAsyncObserver<TResult> observer, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector)
  169. {
  170. if (observer == null)
  171. throw new ArgumentNullException(nameof(observer));
  172. if (resultSelector == null)
  173. throw new ArgumentNullException(nameof(resultSelector));
  174. if (func == null)
  175. throw new ArgumentNullException(nameof(func));
  176. var value = seed;
  177. return Create<TSource>(
  178. async x =>
  179. {
  180. try
  181. {
  182. value = func(value, x);
  183. }
  184. catch (Exception ex)
  185. {
  186. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  187. return;
  188. }
  189. },
  190. observer.OnErrorAsync,
  191. async () =>
  192. {
  193. var res = default(TResult);
  194. try
  195. {
  196. res = resultSelector(value);
  197. }
  198. catch (Exception ex)
  199. {
  200. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  201. return;
  202. }
  203. await observer.OnNextAsync(res).ConfigureAwait(false);
  204. await observer.OnCompletedAsync().ConfigureAwait(false);
  205. }
  206. );
  207. }
  208. public static IAsyncObserver<TSource> Aggregate<TSource, TAccumulate, TResult>(IAsyncObserver<TResult> observer, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> func, Func<TAccumulate, Task<TResult>> resultSelector)
  209. {
  210. if (observer == null)
  211. throw new ArgumentNullException(nameof(observer));
  212. if (resultSelector == null)
  213. throw new ArgumentNullException(nameof(resultSelector));
  214. if (func == null)
  215. throw new ArgumentNullException(nameof(func));
  216. var value = seed;
  217. return Create<TSource>(
  218. async x =>
  219. {
  220. try
  221. {
  222. value = await func(value, x).ConfigureAwait(false);
  223. }
  224. catch (Exception ex)
  225. {
  226. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  227. return;
  228. }
  229. },
  230. observer.OnErrorAsync,
  231. async () =>
  232. {
  233. var res = default(TResult);
  234. try
  235. {
  236. res = await resultSelector(value).ConfigureAwait(false);
  237. }
  238. catch (Exception ex)
  239. {
  240. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  241. return;
  242. }
  243. await observer.OnNextAsync(res).ConfigureAwait(false);
  244. await observer.OnCompletedAsync().ConfigureAwait(false);
  245. }
  246. );
  247. }
  248. }
  249. }