Average.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. namespace System.Reactive.Linq.ObservableImpl
  5. {
  6. internal sealed class AverageDouble : Producer<double, AverageDouble._>
  7. {
  8. private readonly IObservable<double> _source;
  9. public AverageDouble(IObservable<double> source)
  10. {
  11. _source = source;
  12. }
  13. protected override _ CreateSink(IObserver<double> observer) => new(observer);
  14. protected override void Run(_ sink) => sink.Run(_source);
  15. internal sealed class _ : IdentitySink<double>
  16. {
  17. private double _sum;
  18. private long _count;
  19. public _(IObserver<double> observer)
  20. : base(observer)
  21. {
  22. _sum = 0.0;
  23. _count = 0L;
  24. }
  25. public override void OnNext(double value)
  26. {
  27. try
  28. {
  29. checked
  30. {
  31. _sum += value;
  32. _count++;
  33. }
  34. }
  35. catch (Exception ex)
  36. {
  37. ForwardOnError(ex);
  38. }
  39. }
  40. public override void OnCompleted()
  41. {
  42. if (_count > 0)
  43. {
  44. ForwardOnNext(_sum / _count);
  45. ForwardOnCompleted();
  46. }
  47. else
  48. {
  49. try
  50. {
  51. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  52. }
  53. catch (Exception e)
  54. {
  55. ForwardOnError(e);
  56. }
  57. }
  58. }
  59. }
  60. }
  61. internal sealed class AverageSingle : Producer<float, AverageSingle._>
  62. {
  63. private readonly IObservable<float> _source;
  64. public AverageSingle(IObservable<float> source)
  65. {
  66. _source = source;
  67. }
  68. protected override _ CreateSink(IObserver<float> observer) => new(observer);
  69. protected override void Run(_ sink) => sink.Run(_source);
  70. internal sealed class _ : IdentitySink<float>
  71. {
  72. private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects.
  73. private long _count;
  74. public _(IObserver<float> observer)
  75. : base(observer)
  76. {
  77. _sum = 0.0;
  78. _count = 0L;
  79. }
  80. public override void OnNext(float value)
  81. {
  82. try
  83. {
  84. checked
  85. {
  86. _sum += value;
  87. _count++;
  88. }
  89. }
  90. catch (Exception ex)
  91. {
  92. ForwardOnError(ex);
  93. }
  94. }
  95. public override void OnCompleted()
  96. {
  97. if (_count > 0)
  98. {
  99. ForwardOnNext((float)(_sum / _count));
  100. ForwardOnCompleted();
  101. }
  102. else
  103. {
  104. try
  105. {
  106. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  107. }
  108. catch (Exception e)
  109. {
  110. ForwardOnError(e);
  111. }
  112. }
  113. }
  114. }
  115. }
  116. internal sealed class AverageDecimal : Producer<decimal, AverageDecimal._>
  117. {
  118. private readonly IObservable<decimal> _source;
  119. public AverageDecimal(IObservable<decimal> source)
  120. {
  121. _source = source;
  122. }
  123. protected override _ CreateSink(IObserver<decimal> observer) => new(observer);
  124. protected override void Run(_ sink) => sink.Run(_source);
  125. internal sealed class _ : IdentitySink<decimal>
  126. {
  127. private decimal _sum;
  128. private long _count;
  129. public _(IObserver<decimal> observer)
  130. : base(observer)
  131. {
  132. _sum = 0M;
  133. _count = 0L;
  134. }
  135. public override void OnNext(decimal value)
  136. {
  137. try
  138. {
  139. checked
  140. {
  141. _sum += value;
  142. _count++;
  143. }
  144. }
  145. catch (Exception ex)
  146. {
  147. ForwardOnError(ex);
  148. }
  149. }
  150. public override void OnCompleted()
  151. {
  152. if (_count > 0)
  153. {
  154. ForwardOnNext(_sum / _count);
  155. ForwardOnCompleted();
  156. }
  157. else
  158. {
  159. try
  160. {
  161. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  162. }
  163. catch (Exception e)
  164. {
  165. ForwardOnError(e);
  166. }
  167. }
  168. }
  169. }
  170. }
  171. internal sealed class AverageInt32 : Producer<double, AverageInt32._>
  172. {
  173. private readonly IObservable<int> _source;
  174. public AverageInt32(IObservable<int> source)
  175. {
  176. _source = source;
  177. }
  178. protected override _ CreateSink(IObserver<double> observer) => new(observer);
  179. protected override void Run(_ sink) => sink.Run(_source);
  180. internal sealed class _ : Sink<int, double>
  181. {
  182. private long _sum;
  183. private long _count;
  184. public _(IObserver<double> observer)
  185. : base(observer)
  186. {
  187. _sum = 0L;
  188. _count = 0L;
  189. }
  190. public override void OnNext(int value)
  191. {
  192. try
  193. {
  194. checked
  195. {
  196. _sum += value;
  197. _count++;
  198. }
  199. }
  200. catch (Exception ex)
  201. {
  202. ForwardOnError(ex);
  203. }
  204. }
  205. public override void OnCompleted()
  206. {
  207. if (_count > 0)
  208. {
  209. ForwardOnNext((double)_sum / _count);
  210. ForwardOnCompleted();
  211. }
  212. else
  213. {
  214. try
  215. {
  216. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  217. }
  218. catch (Exception e)
  219. {
  220. ForwardOnError(e);
  221. }
  222. }
  223. }
  224. }
  225. }
  226. internal sealed class AverageInt64 : Producer<double, AverageInt64._>
  227. {
  228. private readonly IObservable<long> _source;
  229. public AverageInt64(IObservable<long> source)
  230. {
  231. _source = source;
  232. }
  233. protected override _ CreateSink(IObserver<double> observer) => new(observer);
  234. protected override void Run(_ sink) => sink.Run(_source);
  235. internal sealed class _ : Sink<long, double>
  236. {
  237. private long _sum;
  238. private long _count;
  239. public _(IObserver<double> observer)
  240. : base(observer)
  241. {
  242. _sum = 0L;
  243. _count = 0L;
  244. }
  245. public override void OnNext(long value)
  246. {
  247. try
  248. {
  249. checked
  250. {
  251. _sum += value;
  252. _count++;
  253. }
  254. }
  255. catch (Exception ex)
  256. {
  257. ForwardOnError(ex);
  258. }
  259. }
  260. public override void OnCompleted()
  261. {
  262. if (_count > 0)
  263. {
  264. ForwardOnNext((double)_sum / _count);
  265. ForwardOnCompleted();
  266. }
  267. else
  268. {
  269. try
  270. {
  271. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  272. }
  273. catch (Exception e)
  274. {
  275. ForwardOnError(e);
  276. }
  277. }
  278. }
  279. }
  280. }
  281. internal sealed class AverageDoubleNullable : Producer<double?, AverageDoubleNullable._>
  282. {
  283. private readonly IObservable<double?> _source;
  284. public AverageDoubleNullable(IObservable<double?> source)
  285. {
  286. _source = source;
  287. }
  288. protected override _ CreateSink(IObserver<double?> observer) => new(observer);
  289. protected override void Run(_ sink) => sink.Run(_source);
  290. internal sealed class _ : IdentitySink<double?>
  291. {
  292. private double _sum;
  293. private long _count;
  294. public _(IObserver<double?> observer)
  295. : base(observer)
  296. {
  297. _sum = 0.0;
  298. _count = 0L;
  299. }
  300. public override void OnNext(double? value)
  301. {
  302. try
  303. {
  304. checked
  305. {
  306. if (value != null)
  307. {
  308. _sum += value.Value;
  309. _count++;
  310. }
  311. }
  312. }
  313. catch (Exception ex)
  314. {
  315. ForwardOnError(ex);
  316. }
  317. }
  318. public override void OnCompleted()
  319. {
  320. if (_count > 0)
  321. {
  322. ForwardOnNext(_sum / _count);
  323. }
  324. else
  325. {
  326. ForwardOnNext(null);
  327. }
  328. ForwardOnCompleted();
  329. }
  330. }
  331. }
  332. internal sealed class AverageSingleNullable : Producer<float?, AverageSingleNullable._>
  333. {
  334. private readonly IObservable<float?> _source;
  335. public AverageSingleNullable(IObservable<float?> source)
  336. {
  337. _source = source;
  338. }
  339. protected override _ CreateSink(IObserver<float?> observer) => new(observer);
  340. protected override void Run(_ sink) => sink.Run(_source);
  341. internal sealed class _ : IdentitySink<float?>
  342. {
  343. private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects.
  344. private long _count;
  345. public _(IObserver<float?> observer)
  346. : base(observer)
  347. {
  348. _sum = 0.0;
  349. _count = 0L;
  350. }
  351. public override void OnNext(float? value)
  352. {
  353. try
  354. {
  355. checked
  356. {
  357. if (value != null)
  358. {
  359. _sum += value.Value;
  360. _count++;
  361. }
  362. }
  363. }
  364. catch (Exception ex)
  365. {
  366. ForwardOnError(ex);
  367. }
  368. }
  369. public override void OnCompleted()
  370. {
  371. if (_count > 0)
  372. {
  373. ForwardOnNext((float)(_sum / _count));
  374. }
  375. else
  376. {
  377. ForwardOnNext(null);
  378. }
  379. ForwardOnCompleted();
  380. }
  381. }
  382. }
  383. internal sealed class AverageDecimalNullable : Producer<decimal?, AverageDecimalNullable._>
  384. {
  385. private readonly IObservable<decimal?> _source;
  386. public AverageDecimalNullable(IObservable<decimal?> source)
  387. {
  388. _source = source;
  389. }
  390. protected override _ CreateSink(IObserver<decimal?> observer) => new(observer);
  391. protected override void Run(_ sink) => sink.Run(_source);
  392. internal sealed class _ : IdentitySink<decimal?>
  393. {
  394. private decimal _sum;
  395. private long _count;
  396. public _(IObserver<decimal?> observer)
  397. : base(observer)
  398. {
  399. _sum = 0M;
  400. _count = 0L;
  401. }
  402. public override void OnNext(decimal? value)
  403. {
  404. try
  405. {
  406. checked
  407. {
  408. if (value != null)
  409. {
  410. _sum += value.Value;
  411. _count++;
  412. }
  413. }
  414. }
  415. catch (Exception ex)
  416. {
  417. ForwardOnError(ex);
  418. }
  419. }
  420. public override void OnCompleted()
  421. {
  422. if (_count > 0)
  423. {
  424. ForwardOnNext(_sum / _count);
  425. }
  426. else
  427. {
  428. ForwardOnNext(null);
  429. }
  430. ForwardOnCompleted();
  431. }
  432. }
  433. }
  434. internal sealed class AverageInt32Nullable : Producer<double?, AverageInt32Nullable._>
  435. {
  436. private readonly IObservable<int?> _source;
  437. public AverageInt32Nullable(IObservable<int?> source)
  438. {
  439. _source = source;
  440. }
  441. protected override _ CreateSink(IObserver<double?> observer) => new(observer);
  442. protected override void Run(_ sink) => sink.Run(_source);
  443. internal sealed class _ : Sink<int?, double?>
  444. {
  445. private long _sum;
  446. private long _count;
  447. public _(IObserver<double?> observer)
  448. : base(observer)
  449. {
  450. _sum = 0L;
  451. _count = 0L;
  452. }
  453. public override void OnNext(int? value)
  454. {
  455. try
  456. {
  457. checked
  458. {
  459. if (value != null)
  460. {
  461. _sum += value.Value;
  462. _count++;
  463. }
  464. }
  465. }
  466. catch (Exception ex)
  467. {
  468. ForwardOnError(ex);
  469. }
  470. }
  471. public override void OnCompleted()
  472. {
  473. if (_count > 0)
  474. {
  475. ForwardOnNext((double)_sum / _count);
  476. }
  477. else
  478. {
  479. ForwardOnNext(null);
  480. }
  481. ForwardOnCompleted();
  482. }
  483. }
  484. }
  485. internal sealed class AverageInt64Nullable : Producer<double?, AverageInt64Nullable._>
  486. {
  487. private readonly IObservable<long?> _source;
  488. public AverageInt64Nullable(IObservable<long?> source)
  489. {
  490. _source = source;
  491. }
  492. protected override _ CreateSink(IObserver<double?> observer) => new(observer);
  493. protected override void Run(_ sink) => sink.Run(_source);
  494. internal sealed class _ : Sink<long?, double?>
  495. {
  496. private long _sum;
  497. private long _count;
  498. public _(IObserver<double?> observer)
  499. : base(observer)
  500. {
  501. _sum = 0L;
  502. _count = 0L;
  503. }
  504. public override void OnNext(long? value)
  505. {
  506. try
  507. {
  508. checked
  509. {
  510. if (value != null)
  511. {
  512. _sum += value.Value;
  513. _count++;
  514. }
  515. }
  516. }
  517. catch (Exception ex)
  518. {
  519. ForwardOnError(ex);
  520. }
  521. }
  522. public override void OnCompleted()
  523. {
  524. if (_count > 0)
  525. {
  526. ForwardOnNext((double)_sum / _count);
  527. }
  528. else
  529. {
  530. ForwardOnNext(null);
  531. }
  532. ForwardOnCompleted();
  533. }
  534. }
  535. }
  536. }