1
0

Average.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. namespace System.Reactive.Linq
  5. {
  6. public partial class AsyncObserver
  7. {
  8. public static IAsyncObserver<int> AverageInt32(IAsyncObserver<double> observer)
  9. {
  10. if (observer == null)
  11. throw new ArgumentNullException(nameof(observer));
  12. var sum = 0L;
  13. var count = 0L;
  14. return Create<int>(
  15. async x =>
  16. {
  17. try
  18. {
  19. checked
  20. {
  21. sum += x;
  22. count++;
  23. }
  24. }
  25. catch (Exception ex)
  26. {
  27. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  28. }
  29. },
  30. observer.OnErrorAsync,
  31. async () =>
  32. {
  33. if (count > 0)
  34. {
  35. var res = default(double);
  36. try
  37. {
  38. checked
  39. {
  40. res = (double)sum / count;
  41. }
  42. }
  43. catch (Exception ex)
  44. {
  45. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  46. return;
  47. }
  48. await observer.OnNextAsync(res).ConfigureAwait(false);
  49. await observer.OnCompletedAsync().ConfigureAwait(false);
  50. }
  51. else
  52. {
  53. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  54. }
  55. }
  56. );
  57. }
  58. public static IAsyncObserver<long> AverageInt64(IAsyncObserver<double> observer)
  59. {
  60. if (observer == null)
  61. throw new ArgumentNullException(nameof(observer));
  62. var sum = 0L;
  63. var count = 0L;
  64. return Create<long>(
  65. async x =>
  66. {
  67. try
  68. {
  69. checked
  70. {
  71. sum += x;
  72. count++;
  73. }
  74. }
  75. catch (Exception ex)
  76. {
  77. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  78. }
  79. },
  80. observer.OnErrorAsync,
  81. async () =>
  82. {
  83. if (count > 0)
  84. {
  85. var res = default(double);
  86. try
  87. {
  88. checked
  89. {
  90. res = (double)sum / count;
  91. }
  92. }
  93. catch (Exception ex)
  94. {
  95. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  96. return;
  97. }
  98. await observer.OnNextAsync(res).ConfigureAwait(false);
  99. await observer.OnCompletedAsync().ConfigureAwait(false);
  100. }
  101. else
  102. {
  103. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  104. }
  105. }
  106. );
  107. }
  108. public static IAsyncObserver<float> AverageSingle(IAsyncObserver<float> observer)
  109. {
  110. if (observer == null)
  111. throw new ArgumentNullException(nameof(observer));
  112. var sum = 0.0;
  113. var count = 0L;
  114. return Create<float>(
  115. async x =>
  116. {
  117. try
  118. {
  119. checked
  120. {
  121. sum += x;
  122. count++;
  123. }
  124. }
  125. catch (Exception ex)
  126. {
  127. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  128. }
  129. },
  130. observer.OnErrorAsync,
  131. async () =>
  132. {
  133. if (count > 0)
  134. {
  135. var res = default(float);
  136. try
  137. {
  138. checked
  139. {
  140. res = (float)(sum / count);
  141. }
  142. }
  143. catch (Exception ex)
  144. {
  145. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  146. return;
  147. }
  148. await observer.OnNextAsync(res).ConfigureAwait(false);
  149. await observer.OnCompletedAsync().ConfigureAwait(false);
  150. }
  151. else
  152. {
  153. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  154. }
  155. }
  156. );
  157. }
  158. public static IAsyncObserver<double> AverageDouble(IAsyncObserver<double> observer)
  159. {
  160. if (observer == null)
  161. throw new ArgumentNullException(nameof(observer));
  162. var sum = 0.0;
  163. var count = 0L;
  164. return Create<double>(
  165. async x =>
  166. {
  167. try
  168. {
  169. checked
  170. {
  171. sum += x;
  172. count++;
  173. }
  174. }
  175. catch (Exception ex)
  176. {
  177. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  178. }
  179. },
  180. observer.OnErrorAsync,
  181. async () =>
  182. {
  183. if (count > 0)
  184. {
  185. var res = default(double);
  186. try
  187. {
  188. checked
  189. {
  190. res = sum / count;
  191. }
  192. }
  193. catch (Exception ex)
  194. {
  195. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  196. return;
  197. }
  198. await observer.OnNextAsync(res).ConfigureAwait(false);
  199. await observer.OnCompletedAsync().ConfigureAwait(false);
  200. }
  201. else
  202. {
  203. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  204. }
  205. }
  206. );
  207. }
  208. public static IAsyncObserver<decimal> AverageDecimal(IAsyncObserver<decimal> observer)
  209. {
  210. if (observer == null)
  211. throw new ArgumentNullException(nameof(observer));
  212. var sum = 0m;
  213. var count = 0L;
  214. return Create<decimal>(
  215. async x =>
  216. {
  217. try
  218. {
  219. checked
  220. {
  221. sum += x;
  222. count++;
  223. }
  224. }
  225. catch (Exception ex)
  226. {
  227. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  228. }
  229. },
  230. observer.OnErrorAsync,
  231. async () =>
  232. {
  233. if (count > 0)
  234. {
  235. var res = default(decimal);
  236. try
  237. {
  238. checked
  239. {
  240. res = sum / count;
  241. }
  242. }
  243. catch (Exception ex)
  244. {
  245. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  246. return;
  247. }
  248. await observer.OnNextAsync(res).ConfigureAwait(false);
  249. await observer.OnCompletedAsync().ConfigureAwait(false);
  250. }
  251. else
  252. {
  253. await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
  254. }
  255. }
  256. );
  257. }
  258. public static IAsyncObserver<int?> AverageNullableInt32(IAsyncObserver<double?> observer)
  259. {
  260. if (observer == null)
  261. throw new ArgumentNullException(nameof(observer));
  262. var sum = 0L;
  263. var count = 0L;
  264. return Create<int?>(
  265. async x =>
  266. {
  267. try
  268. {
  269. if (x.HasValue)
  270. {
  271. checked
  272. {
  273. sum += x.GetValueOrDefault();
  274. count++;
  275. }
  276. }
  277. }
  278. catch (Exception ex)
  279. {
  280. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  281. }
  282. },
  283. observer.OnErrorAsync,
  284. async () =>
  285. {
  286. if (count > 0)
  287. {
  288. var res = default(double);
  289. try
  290. {
  291. checked
  292. {
  293. res = (double)sum / count;
  294. }
  295. }
  296. catch (Exception ex)
  297. {
  298. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  299. return;
  300. }
  301. await observer.OnNextAsync(res).ConfigureAwait(false);
  302. await observer.OnCompletedAsync().ConfigureAwait(false);
  303. }
  304. else
  305. {
  306. await observer.OnNextAsync(null).ConfigureAwait(false);
  307. await observer.OnCompletedAsync().ConfigureAwait(false);
  308. }
  309. }
  310. );
  311. }
  312. public static IAsyncObserver<long?> AverageNullableInt64(IAsyncObserver<double?> observer)
  313. {
  314. if (observer == null)
  315. throw new ArgumentNullException(nameof(observer));
  316. var sum = 0L;
  317. var count = 0L;
  318. return Create<long?>(
  319. async x =>
  320. {
  321. try
  322. {
  323. if (x.HasValue)
  324. {
  325. checked
  326. {
  327. sum += x.GetValueOrDefault();
  328. count++;
  329. }
  330. }
  331. }
  332. catch (Exception ex)
  333. {
  334. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  335. }
  336. },
  337. observer.OnErrorAsync,
  338. async () =>
  339. {
  340. if (count > 0)
  341. {
  342. var res = default(double);
  343. try
  344. {
  345. checked
  346. {
  347. res = (double)sum / count;
  348. }
  349. }
  350. catch (Exception ex)
  351. {
  352. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  353. return;
  354. }
  355. await observer.OnNextAsync(res).ConfigureAwait(false);
  356. await observer.OnCompletedAsync().ConfigureAwait(false);
  357. }
  358. else
  359. {
  360. await observer.OnNextAsync(null).ConfigureAwait(false);
  361. await observer.OnCompletedAsync().ConfigureAwait(false);
  362. }
  363. }
  364. );
  365. }
  366. public static IAsyncObserver<float?> AverageNullableSingle(IAsyncObserver<float?> observer)
  367. {
  368. if (observer == null)
  369. throw new ArgumentNullException(nameof(observer));
  370. var sum = 0.0;
  371. var count = 0L;
  372. return Create<float?>(
  373. async x =>
  374. {
  375. try
  376. {
  377. if (x.HasValue)
  378. {
  379. checked
  380. {
  381. sum += x.GetValueOrDefault();
  382. count++;
  383. }
  384. }
  385. }
  386. catch (Exception ex)
  387. {
  388. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  389. }
  390. },
  391. observer.OnErrorAsync,
  392. async () =>
  393. {
  394. if (count > 0)
  395. {
  396. var res = default(float);
  397. try
  398. {
  399. checked
  400. {
  401. res = (float)(sum / count);
  402. }
  403. }
  404. catch (Exception ex)
  405. {
  406. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  407. return;
  408. }
  409. await observer.OnNextAsync(res).ConfigureAwait(false);
  410. await observer.OnCompletedAsync().ConfigureAwait(false);
  411. }
  412. else
  413. {
  414. await observer.OnNextAsync(null).ConfigureAwait(false);
  415. await observer.OnCompletedAsync().ConfigureAwait(false);
  416. }
  417. }
  418. );
  419. }
  420. public static IAsyncObserver<double?> AverageNullableDouble(IAsyncObserver<double?> observer)
  421. {
  422. if (observer == null)
  423. throw new ArgumentNullException(nameof(observer));
  424. var sum = 0.0;
  425. var count = 0L;
  426. return Create<double?>(
  427. async x =>
  428. {
  429. try
  430. {
  431. if (x.HasValue)
  432. {
  433. checked
  434. {
  435. sum += x.GetValueOrDefault();
  436. count++;
  437. }
  438. }
  439. }
  440. catch (Exception ex)
  441. {
  442. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  443. }
  444. },
  445. observer.OnErrorAsync,
  446. async () =>
  447. {
  448. if (count > 0)
  449. {
  450. var res = default(double);
  451. try
  452. {
  453. checked
  454. {
  455. res = sum / count;
  456. }
  457. }
  458. catch (Exception ex)
  459. {
  460. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  461. return;
  462. }
  463. await observer.OnNextAsync(res).ConfigureAwait(false);
  464. await observer.OnCompletedAsync().ConfigureAwait(false);
  465. }
  466. else
  467. {
  468. await observer.OnNextAsync(null).ConfigureAwait(false);
  469. await observer.OnCompletedAsync().ConfigureAwait(false);
  470. }
  471. }
  472. );
  473. }
  474. public static IAsyncObserver<decimal?> AverageNullableDecimal(IAsyncObserver<decimal?> observer)
  475. {
  476. if (observer == null)
  477. throw new ArgumentNullException(nameof(observer));
  478. var sum = 0m;
  479. var count = 0L;
  480. return Create<decimal?>(
  481. async x =>
  482. {
  483. try
  484. {
  485. if (x.HasValue)
  486. {
  487. checked
  488. {
  489. sum += x.GetValueOrDefault();
  490. count++;
  491. }
  492. }
  493. }
  494. catch (Exception ex)
  495. {
  496. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  497. }
  498. },
  499. observer.OnErrorAsync,
  500. async () =>
  501. {
  502. if (count > 0)
  503. {
  504. var res = default(decimal);
  505. try
  506. {
  507. checked
  508. {
  509. res = sum / count;
  510. }
  511. }
  512. catch (Exception ex)
  513. {
  514. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  515. return;
  516. }
  517. await observer.OnNextAsync(res).ConfigureAwait(false);
  518. await observer.OnCompletedAsync().ConfigureAwait(false);
  519. }
  520. else
  521. {
  522. await observer.OnNextAsync(null).ConfigureAwait(false);
  523. await observer.OnCompletedAsync().ConfigureAwait(false);
  524. }
  525. }
  526. );
  527. }
  528. }
  529. }