Sum.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. namespace System.Reactive.Linq.Observαble
  5. {
  6. class SumDouble : Producer<double>
  7. {
  8. private readonly IObservable<double> _source;
  9. public SumDouble(IObservable<double> source)
  10. {
  11. _source = source;
  12. }
  13. protected override IDisposable Run(IObserver<double> observer, IDisposable cancel, Action<IDisposable> setSink)
  14. {
  15. var sink = new _(observer, cancel);
  16. setSink(sink);
  17. return _source.SubscribeSafe(sink);
  18. }
  19. class _ : Sink<double>, IObserver<double>
  20. {
  21. private double _sum;
  22. public _(IObserver<double> observer, IDisposable cancel)
  23. : base(observer, cancel)
  24. {
  25. _sum = 0.0;
  26. }
  27. public void OnNext(double value)
  28. {
  29. _sum += value;
  30. }
  31. public void OnError(Exception error)
  32. {
  33. base._observer.OnError(error);
  34. base.Dispose();
  35. }
  36. public void OnCompleted()
  37. {
  38. base._observer.OnNext(_sum);
  39. base._observer.OnCompleted();
  40. base.Dispose();
  41. }
  42. }
  43. }
  44. class SumSingle : Producer<float>
  45. {
  46. private readonly IObservable<float> _source;
  47. public SumSingle(IObservable<float> source)
  48. {
  49. _source = source;
  50. }
  51. protected override IDisposable Run(IObserver<float> observer, IDisposable cancel, Action<IDisposable> setSink)
  52. {
  53. var sink = new _(observer, cancel);
  54. setSink(sink);
  55. return _source.SubscribeSafe(sink);
  56. }
  57. class _ : Sink<float>, IObserver<float>
  58. {
  59. private double _sum; // This is what LINQ to Objects does!
  60. public _(IObserver<float> observer, IDisposable cancel)
  61. : base(observer, cancel)
  62. {
  63. _sum = 0.0; // This is what LINQ to Objects does!
  64. }
  65. public void OnNext(float value)
  66. {
  67. _sum += value; // This is what LINQ to Objects does!
  68. }
  69. public void OnError(Exception error)
  70. {
  71. base._observer.OnError(error);
  72. base.Dispose();
  73. }
  74. public void OnCompleted()
  75. {
  76. base._observer.OnNext((float)_sum); // This is what LINQ to Objects does!
  77. base._observer.OnCompleted();
  78. base.Dispose();
  79. }
  80. }
  81. }
  82. class SumDecimal : Producer<decimal>
  83. {
  84. private readonly IObservable<decimal> _source;
  85. public SumDecimal(IObservable<decimal> source)
  86. {
  87. _source = source;
  88. }
  89. protected override IDisposable Run(IObserver<decimal> observer, IDisposable cancel, Action<IDisposable> setSink)
  90. {
  91. var sink = new _(observer, cancel);
  92. setSink(sink);
  93. return _source.SubscribeSafe(sink);
  94. }
  95. class _ : Sink<decimal>, IObserver<decimal>
  96. {
  97. private decimal _sum;
  98. public _(IObserver<decimal> observer, IDisposable cancel)
  99. : base(observer, cancel)
  100. {
  101. _sum = 0M;
  102. }
  103. public void OnNext(decimal value)
  104. {
  105. _sum += value;
  106. }
  107. public void OnError(Exception error)
  108. {
  109. base._observer.OnError(error);
  110. base.Dispose();
  111. }
  112. public void OnCompleted()
  113. {
  114. base._observer.OnNext(_sum);
  115. base._observer.OnCompleted();
  116. base.Dispose();
  117. }
  118. }
  119. }
  120. class SumInt32 : Producer<int>
  121. {
  122. private readonly IObservable<int> _source;
  123. public SumInt32(IObservable<int> source)
  124. {
  125. _source = source;
  126. }
  127. protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
  128. {
  129. var sink = new _(observer, cancel);
  130. setSink(sink);
  131. return _source.SubscribeSafe(sink);
  132. }
  133. class _ : Sink<int>, IObserver<int>
  134. {
  135. private int _sum;
  136. public _(IObserver<int> observer, IDisposable cancel)
  137. : base(observer, cancel)
  138. {
  139. _sum = 0;
  140. }
  141. public void OnNext(int value)
  142. {
  143. try
  144. {
  145. checked
  146. {
  147. _sum += value;
  148. }
  149. }
  150. catch (Exception exception)
  151. {
  152. base._observer.OnError(exception);
  153. base.Dispose();
  154. }
  155. }
  156. public void OnError(Exception error)
  157. {
  158. base._observer.OnError(error);
  159. base.Dispose();
  160. }
  161. public void OnCompleted()
  162. {
  163. base._observer.OnNext(_sum);
  164. base._observer.OnCompleted();
  165. base.Dispose();
  166. }
  167. }
  168. }
  169. class SumInt64 : Producer<long>
  170. {
  171. private readonly IObservable<long> _source;
  172. public SumInt64(IObservable<long> source)
  173. {
  174. _source = source;
  175. }
  176. protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
  177. {
  178. var sink = new _(observer, cancel);
  179. setSink(sink);
  180. return _source.SubscribeSafe(sink);
  181. }
  182. class _ : Sink<long>, IObserver<long>
  183. {
  184. private long _sum;
  185. public _(IObserver<long> observer, IDisposable cancel)
  186. : base(observer, cancel)
  187. {
  188. _sum = 0L;
  189. }
  190. public void OnNext(long value)
  191. {
  192. try
  193. {
  194. checked
  195. {
  196. _sum += value;
  197. }
  198. }
  199. catch (Exception exception)
  200. {
  201. base._observer.OnError(exception);
  202. base.Dispose();
  203. }
  204. }
  205. public void OnError(Exception error)
  206. {
  207. base._observer.OnError(error);
  208. base.Dispose();
  209. }
  210. public void OnCompleted()
  211. {
  212. base._observer.OnNext(_sum);
  213. base._observer.OnCompleted();
  214. base.Dispose();
  215. }
  216. }
  217. }
  218. class SumDoubleNullable : Producer<double?>
  219. {
  220. private readonly IObservable<double?> _source;
  221. public SumDoubleNullable(IObservable<double?> source)
  222. {
  223. _source = source;
  224. }
  225. protected override IDisposable Run(IObserver<double?> observer, IDisposable cancel, Action<IDisposable> setSink)
  226. {
  227. var sink = new _(observer, cancel);
  228. setSink(sink);
  229. return _source.SubscribeSafe(sink);
  230. }
  231. class _ : Sink<double?>, IObserver<double?>
  232. {
  233. private double _sum;
  234. public _(IObserver<double?> observer, IDisposable cancel)
  235. : base(observer, cancel)
  236. {
  237. _sum = 0.0;
  238. }
  239. public void OnNext(double? value)
  240. {
  241. if (value != null)
  242. _sum += value.Value;
  243. }
  244. public void OnError(Exception error)
  245. {
  246. base._observer.OnError(error);
  247. base.Dispose();
  248. }
  249. public void OnCompleted()
  250. {
  251. base._observer.OnNext(_sum);
  252. base._observer.OnCompleted();
  253. base.Dispose();
  254. }
  255. }
  256. }
  257. class SumSingleNullable : Producer<float?>
  258. {
  259. private readonly IObservable<float?> _source;
  260. public SumSingleNullable(IObservable<float?> source)
  261. {
  262. _source = source;
  263. }
  264. protected override IDisposable Run(IObserver<float?> observer, IDisposable cancel, Action<IDisposable> setSink)
  265. {
  266. var sink = new _(observer, cancel);
  267. setSink(sink);
  268. return _source.SubscribeSafe(sink);
  269. }
  270. class _ : Sink<float?>, IObserver<float?>
  271. {
  272. private double _sum; // This is what LINQ to Objects does!
  273. public _(IObserver<float?> observer, IDisposable cancel)
  274. : base(observer, cancel)
  275. {
  276. _sum = 0.0; // This is what LINQ to Objects does!
  277. }
  278. public void OnNext(float? value)
  279. {
  280. if (value != null)
  281. _sum += value.Value; // This is what LINQ to Objects does!
  282. }
  283. public void OnError(Exception error)
  284. {
  285. base._observer.OnError(error);
  286. base.Dispose();
  287. }
  288. public void OnCompleted()
  289. {
  290. base._observer.OnNext((float)_sum); // This is what LINQ to Objects does!
  291. base._observer.OnCompleted();
  292. base.Dispose();
  293. }
  294. }
  295. }
  296. class SumDecimalNullable : Producer<decimal?>
  297. {
  298. private readonly IObservable<decimal?> _source;
  299. public SumDecimalNullable(IObservable<decimal?> source)
  300. {
  301. _source = source;
  302. }
  303. protected override IDisposable Run(IObserver<decimal?> observer, IDisposable cancel, Action<IDisposable> setSink)
  304. {
  305. var sink = new _(observer, cancel);
  306. setSink(sink);
  307. return _source.SubscribeSafe(sink);
  308. }
  309. class _ : Sink<decimal?>, IObserver<decimal?>
  310. {
  311. private decimal _sum;
  312. public _(IObserver<decimal?> observer, IDisposable cancel)
  313. : base(observer, cancel)
  314. {
  315. _sum = 0M;
  316. }
  317. public void OnNext(decimal? value)
  318. {
  319. if (value != null)
  320. _sum += value.Value;
  321. }
  322. public void OnError(Exception error)
  323. {
  324. base._observer.OnError(error);
  325. base.Dispose();
  326. }
  327. public void OnCompleted()
  328. {
  329. base._observer.OnNext(_sum);
  330. base._observer.OnCompleted();
  331. base.Dispose();
  332. }
  333. }
  334. }
  335. class SumInt32Nullable : Producer<int?>
  336. {
  337. private readonly IObservable<int?> _source;
  338. public SumInt32Nullable(IObservable<int?> source)
  339. {
  340. _source = source;
  341. }
  342. protected override IDisposable Run(IObserver<int?> observer, IDisposable cancel, Action<IDisposable> setSink)
  343. {
  344. var sink = new _(observer, cancel);
  345. setSink(sink);
  346. return _source.SubscribeSafe(sink);
  347. }
  348. class _ : Sink<int?>, IObserver<int?>
  349. {
  350. private int _sum;
  351. public _(IObserver<int?> observer, IDisposable cancel)
  352. : base(observer, cancel)
  353. {
  354. _sum = 0;
  355. }
  356. public void OnNext(int? value)
  357. {
  358. try
  359. {
  360. checked
  361. {
  362. if (value != null)
  363. _sum += value.Value;
  364. }
  365. }
  366. catch (Exception exception)
  367. {
  368. base._observer.OnError(exception);
  369. base.Dispose();
  370. }
  371. }
  372. public void OnError(Exception error)
  373. {
  374. base._observer.OnError(error);
  375. base.Dispose();
  376. }
  377. public void OnCompleted()
  378. {
  379. base._observer.OnNext(_sum);
  380. base._observer.OnCompleted();
  381. base.Dispose();
  382. }
  383. }
  384. }
  385. class SumInt64Nullable : Producer<long?>
  386. {
  387. private readonly IObservable<long?> _source;
  388. public SumInt64Nullable(IObservable<long?> source)
  389. {
  390. _source = source;
  391. }
  392. protected override IDisposable Run(IObserver<long?> observer, IDisposable cancel, Action<IDisposable> setSink)
  393. {
  394. var sink = new _(observer, cancel);
  395. setSink(sink);
  396. return _source.SubscribeSafe(sink);
  397. }
  398. class _ : Sink<long?>, IObserver<long?>
  399. {
  400. private long _sum;
  401. public _(IObserver<long?> observer, IDisposable cancel)
  402. : base(observer, cancel)
  403. {
  404. _sum = 0L;
  405. }
  406. public void OnNext(long? value)
  407. {
  408. try
  409. {
  410. checked
  411. {
  412. if (value != null)
  413. _sum += value.Value;
  414. }
  415. }
  416. catch (Exception exception)
  417. {
  418. base._observer.OnError(exception);
  419. base.Dispose();
  420. }
  421. }
  422. public void OnError(Exception error)
  423. {
  424. base._observer.OnError(error);
  425. base.Dispose();
  426. }
  427. public void OnCompleted()
  428. {
  429. base._observer.OnNext(_sum);
  430. base._observer.OnCompleted();
  431. base.Dispose();
  432. }
  433. }
  434. }
  435. }
  436. #endif