1
0

Sum.cs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading.Tasks;
  5. namespace System.Reactive.Linq
  6. {
  7. partial class AsyncObserver
  8. {
  9. public static IAsyncObserver<int> SumInt32(IAsyncObserver<int> observer)
  10. {
  11. if (observer == null)
  12. throw new ArgumentNullException(nameof(observer));
  13. var sum = 0;
  14. return Create<int>(
  15. async x =>
  16. {
  17. try
  18. {
  19. checked
  20. {
  21. sum += x;
  22. }
  23. }
  24. catch (Exception ex)
  25. {
  26. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  27. }
  28. },
  29. observer.OnErrorAsync,
  30. async () =>
  31. {
  32. await observer.OnNextAsync(sum).ConfigureAwait(false);
  33. await observer.OnCompletedAsync().ConfigureAwait(false);
  34. }
  35. );
  36. }
  37. public static IAsyncObserver<long> SumInt64(IAsyncObserver<long> observer)
  38. {
  39. if (observer == null)
  40. throw new ArgumentNullException(nameof(observer));
  41. var sum = 0L;
  42. return Create<long>(
  43. async x =>
  44. {
  45. try
  46. {
  47. checked
  48. {
  49. sum += x;
  50. }
  51. }
  52. catch (Exception ex)
  53. {
  54. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  55. }
  56. },
  57. observer.OnErrorAsync,
  58. async () =>
  59. {
  60. await observer.OnNextAsync(sum).ConfigureAwait(false);
  61. await observer.OnCompletedAsync().ConfigureAwait(false);
  62. }
  63. );
  64. }
  65. public static IAsyncObserver<float> SumSingle(IAsyncObserver<float> observer)
  66. {
  67. if (observer == null)
  68. throw new ArgumentNullException(nameof(observer));
  69. var sum = 0.0;
  70. return Create<float>(
  71. x =>
  72. {
  73. sum += x;
  74. return Task.CompletedTask;
  75. },
  76. observer.OnErrorAsync,
  77. async () =>
  78. {
  79. var res = default(float);
  80. try
  81. {
  82. checked
  83. {
  84. res = (float)sum;
  85. }
  86. }
  87. catch (Exception ex)
  88. {
  89. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  90. return;
  91. }
  92. await observer.OnNextAsync(res).ConfigureAwait(false);
  93. await observer.OnCompletedAsync().ConfigureAwait(false);
  94. }
  95. );
  96. }
  97. public static IAsyncObserver<double> SumDouble(IAsyncObserver<double> observer)
  98. {
  99. if (observer == null)
  100. throw new ArgumentNullException(nameof(observer));
  101. var sum = 0.0;
  102. return Create<double>(
  103. x =>
  104. {
  105. sum += x;
  106. return Task.CompletedTask;
  107. },
  108. observer.OnErrorAsync,
  109. async () =>
  110. {
  111. await observer.OnNextAsync(sum).ConfigureAwait(false);
  112. await observer.OnCompletedAsync().ConfigureAwait(false);
  113. }
  114. );
  115. }
  116. public static IAsyncObserver<decimal> SumDecimal(IAsyncObserver<decimal> observer)
  117. {
  118. if (observer == null)
  119. throw new ArgumentNullException(nameof(observer));
  120. var sum = 0m;
  121. return Create<decimal>(
  122. x =>
  123. {
  124. sum += x;
  125. return Task.CompletedTask;
  126. },
  127. observer.OnErrorAsync,
  128. async () =>
  129. {
  130. await observer.OnNextAsync(sum).ConfigureAwait(false);
  131. await observer.OnCompletedAsync().ConfigureAwait(false);
  132. }
  133. );
  134. }
  135. public static IAsyncObserver<int?> SumNullableInt32(IAsyncObserver<int?> observer)
  136. {
  137. if (observer == null)
  138. throw new ArgumentNullException(nameof(observer));
  139. var sum = 0;
  140. return Create<int?>(
  141. async x =>
  142. {
  143. try
  144. {
  145. checked
  146. {
  147. if (x != null)
  148. {
  149. sum += x.GetValueOrDefault();
  150. }
  151. }
  152. }
  153. catch (Exception ex)
  154. {
  155. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  156. }
  157. },
  158. observer.OnErrorAsync,
  159. async () =>
  160. {
  161. await observer.OnNextAsync(sum).ConfigureAwait(false);
  162. await observer.OnCompletedAsync().ConfigureAwait(false);
  163. }
  164. );
  165. }
  166. public static IAsyncObserver<long?> SumNullableInt64(IAsyncObserver<long?> observer)
  167. {
  168. if (observer == null)
  169. throw new ArgumentNullException(nameof(observer));
  170. var sum = (long?)0L;
  171. return Create<long?>(
  172. async x =>
  173. {
  174. try
  175. {
  176. checked
  177. {
  178. if (x != null)
  179. {
  180. sum += x.GetValueOrDefault();
  181. }
  182. }
  183. }
  184. catch (Exception ex)
  185. {
  186. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  187. }
  188. },
  189. observer.OnErrorAsync,
  190. async () =>
  191. {
  192. await observer.OnNextAsync(sum).ConfigureAwait(false);
  193. await observer.OnCompletedAsync().ConfigureAwait(false);
  194. }
  195. );
  196. }
  197. public static IAsyncObserver<float?> SumNullableSingle(IAsyncObserver<float?> observer)
  198. {
  199. if (observer == null)
  200. throw new ArgumentNullException(nameof(observer));
  201. var sum = 0.0;
  202. return Create<float?>(
  203. x =>
  204. {
  205. if (x != null)
  206. {
  207. sum += x.GetValueOrDefault();
  208. }
  209. return Task.CompletedTask;
  210. },
  211. observer.OnErrorAsync,
  212. async () =>
  213. {
  214. var res = default(float);
  215. try
  216. {
  217. checked
  218. {
  219. res = (float)sum;
  220. }
  221. }
  222. catch (Exception ex)
  223. {
  224. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  225. return;
  226. }
  227. await observer.OnNextAsync(res).ConfigureAwait(false);
  228. await observer.OnCompletedAsync().ConfigureAwait(false);
  229. }
  230. );
  231. }
  232. public static IAsyncObserver<double?> SumNullableDouble(IAsyncObserver<double?> observer)
  233. {
  234. if (observer == null)
  235. throw new ArgumentNullException(nameof(observer));
  236. var sum = 0.0;
  237. return Create<double?>(
  238. x =>
  239. {
  240. if (x != null)
  241. {
  242. sum += x.GetValueOrDefault();
  243. }
  244. return Task.CompletedTask;
  245. },
  246. observer.OnErrorAsync,
  247. async () =>
  248. {
  249. await observer.OnNextAsync(sum).ConfigureAwait(false);
  250. await observer.OnCompletedAsync().ConfigureAwait(false);
  251. }
  252. );
  253. }
  254. public static IAsyncObserver<decimal?> SumNullableDecimal(IAsyncObserver<decimal?> observer)
  255. {
  256. if (observer == null)
  257. throw new ArgumentNullException(nameof(observer));
  258. var sum = 0m;
  259. return Create<decimal?>(
  260. x =>
  261. {
  262. if (x != null)
  263. {
  264. sum += x.GetValueOrDefault();
  265. }
  266. return Task.CompletedTask;
  267. },
  268. observer.OnErrorAsync,
  269. async () =>
  270. {
  271. await observer.OnNextAsync(sum).ConfigureAwait(false);
  272. await observer.OnCompletedAsync().ConfigureAwait(false);
  273. }
  274. );
  275. }
  276. }
  277. }