Max.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. internal sealed class Max<TSource> : Producer<TSource, Max<TSource>._>
  8. {
  9. private readonly IObservable<TSource> _source;
  10. private readonly IComparer<TSource> _comparer;
  11. public Max(IObservable<TSource> source, IComparer<TSource> comparer)
  12. {
  13. _source = source;
  14. _comparer = comparer;
  15. }
  16. protected override _ CreateSink(IObserver<TSource> observer) => default(TSource) == null ? (_)new Null(_comparer, observer) : new NonNull(_comparer, observer);
  17. protected override void Run(_ sink) => sink.Run(_source);
  18. internal abstract class _ : IdentitySink<TSource>
  19. {
  20. protected readonly IComparer<TSource> _comparer;
  21. public _(IComparer<TSource> comparer, IObserver<TSource> observer)
  22. : base(observer)
  23. {
  24. _comparer = comparer;
  25. }
  26. }
  27. private sealed class NonNull : _
  28. {
  29. private bool _hasValue;
  30. private TSource _lastValue;
  31. public NonNull(IComparer<TSource> comparer, IObserver<TSource> observer)
  32. : base(comparer, observer)
  33. {
  34. _hasValue = false;
  35. _lastValue = default(TSource);
  36. }
  37. public override void OnNext(TSource value)
  38. {
  39. if (_hasValue)
  40. {
  41. var comparison = 0;
  42. try
  43. {
  44. comparison = _comparer.Compare(value, _lastValue);
  45. }
  46. catch (Exception ex)
  47. {
  48. ForwardOnError(ex);
  49. return;
  50. }
  51. if (comparison > 0)
  52. {
  53. _lastValue = value;
  54. }
  55. }
  56. else
  57. {
  58. _hasValue = true;
  59. _lastValue = value;
  60. }
  61. }
  62. public override void OnCompleted()
  63. {
  64. if (!_hasValue)
  65. {
  66. ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  67. }
  68. else
  69. {
  70. ForwardOnNext(_lastValue);
  71. ForwardOnCompleted();
  72. }
  73. }
  74. }
  75. private sealed class Null : _
  76. {
  77. private TSource _lastValue;
  78. public Null(IComparer<TSource> comparer, IObserver<TSource> observer)
  79. : base(comparer, observer)
  80. {
  81. _lastValue = default(TSource);
  82. }
  83. public override void OnNext(TSource value)
  84. {
  85. if (value != null)
  86. {
  87. if (_lastValue == null)
  88. {
  89. _lastValue = value;
  90. }
  91. else
  92. {
  93. var comparison = 0;
  94. try
  95. {
  96. comparison = _comparer.Compare(value, _lastValue);
  97. }
  98. catch (Exception ex)
  99. {
  100. ForwardOnError(ex);
  101. return;
  102. }
  103. if (comparison > 0)
  104. {
  105. _lastValue = value;
  106. }
  107. }
  108. }
  109. }
  110. public override void OnError(Exception error)
  111. {
  112. ForwardOnError(error);
  113. }
  114. public override void OnCompleted()
  115. {
  116. ForwardOnNext(_lastValue);
  117. ForwardOnCompleted();
  118. }
  119. }
  120. }
  121. internal sealed class MaxDouble : Producer<double, MaxDouble._>
  122. {
  123. private readonly IObservable<double> _source;
  124. public MaxDouble(IObservable<double> source)
  125. {
  126. _source = source;
  127. }
  128. protected override _ CreateSink(IObserver<double> observer) => new _(observer);
  129. protected override void Run(_ sink) => sink.Run(_source);
  130. internal sealed class _ : IdentitySink<double>
  131. {
  132. private bool _hasValue;
  133. private double _lastValue;
  134. public _(IObserver<double> observer)
  135. : base(observer)
  136. {
  137. _hasValue = false;
  138. _lastValue = default(double);
  139. }
  140. public override void OnNext(double value)
  141. {
  142. if (_hasValue)
  143. {
  144. if (value > _lastValue || double.IsNaN(value))
  145. {
  146. _lastValue = value;
  147. }
  148. }
  149. else
  150. {
  151. _lastValue = value;
  152. _hasValue = true;
  153. }
  154. }
  155. public override void OnCompleted()
  156. {
  157. if (!_hasValue)
  158. {
  159. ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  160. }
  161. else
  162. {
  163. ForwardOnNext(_lastValue);
  164. ForwardOnCompleted();
  165. }
  166. }
  167. }
  168. }
  169. internal sealed class MaxSingle : Producer<float, MaxSingle._>
  170. {
  171. private readonly IObservable<float> _source;
  172. public MaxSingle(IObservable<float> source)
  173. {
  174. _source = source;
  175. }
  176. protected override _ CreateSink(IObserver<float> observer) => new _(observer);
  177. protected override void Run(_ sink) => sink.Run(_source);
  178. internal sealed class _ : IdentitySink<float>
  179. {
  180. private bool _hasValue;
  181. private float _lastValue;
  182. public _(IObserver<float> observer)
  183. : base(observer)
  184. {
  185. _hasValue = false;
  186. _lastValue = default(float);
  187. }
  188. public override void OnNext(float value)
  189. {
  190. if (_hasValue)
  191. {
  192. if (value > _lastValue || float.IsNaN(value))
  193. {
  194. _lastValue = value;
  195. }
  196. }
  197. else
  198. {
  199. _lastValue = value;
  200. _hasValue = true;
  201. }
  202. }
  203. public override void OnCompleted()
  204. {
  205. if (!_hasValue)
  206. {
  207. ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  208. }
  209. else
  210. {
  211. ForwardOnNext(_lastValue);
  212. ForwardOnCompleted();
  213. }
  214. }
  215. }
  216. }
  217. internal sealed class MaxDecimal : Producer<decimal, MaxDecimal._>
  218. {
  219. private readonly IObservable<decimal> _source;
  220. public MaxDecimal(IObservable<decimal> source)
  221. {
  222. _source = source;
  223. }
  224. protected override _ CreateSink(IObserver<decimal> observer) => new _(observer);
  225. protected override void Run(_ sink) => sink.Run(_source);
  226. internal sealed class _ : IdentitySink<decimal>
  227. {
  228. private bool _hasValue;
  229. private decimal _lastValue;
  230. public _(IObserver<decimal> observer)
  231. : base(observer)
  232. {
  233. _hasValue = false;
  234. _lastValue = default(decimal);
  235. }
  236. public override void OnNext(decimal value)
  237. {
  238. if (_hasValue)
  239. {
  240. if (value > _lastValue)
  241. {
  242. _lastValue = value;
  243. }
  244. }
  245. else
  246. {
  247. _lastValue = value;
  248. _hasValue = true;
  249. }
  250. }
  251. public override void OnCompleted()
  252. {
  253. if (!_hasValue)
  254. {
  255. ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  256. }
  257. else
  258. {
  259. ForwardOnNext(_lastValue);
  260. ForwardOnCompleted();
  261. }
  262. }
  263. }
  264. }
  265. internal sealed class MaxInt32 : Producer<int, MaxInt32._>
  266. {
  267. private readonly IObservable<int> _source;
  268. public MaxInt32(IObservable<int> source)
  269. {
  270. _source = source;
  271. }
  272. protected override _ CreateSink(IObserver<int> observer) => new _(observer);
  273. protected override void Run(_ sink) => sink.Run(_source);
  274. internal sealed class _ : IdentitySink<int>
  275. {
  276. private bool _hasValue;
  277. private int _lastValue;
  278. public _(IObserver<int> observer)
  279. : base(observer)
  280. {
  281. _hasValue = false;
  282. _lastValue = default(int);
  283. }
  284. public override void OnNext(int value)
  285. {
  286. if (_hasValue)
  287. {
  288. if (value > _lastValue)
  289. {
  290. _lastValue = value;
  291. }
  292. }
  293. else
  294. {
  295. _lastValue = value;
  296. _hasValue = true;
  297. }
  298. }
  299. public override void OnCompleted()
  300. {
  301. if (!_hasValue)
  302. {
  303. ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  304. }
  305. else
  306. {
  307. ForwardOnNext(_lastValue);
  308. ForwardOnCompleted();
  309. }
  310. }
  311. }
  312. }
  313. internal sealed class MaxInt64 : Producer<long, MaxInt64._>
  314. {
  315. private readonly IObservable<long> _source;
  316. public MaxInt64(IObservable<long> source)
  317. {
  318. _source = source;
  319. }
  320. protected override _ CreateSink(IObserver<long> observer) => new _(observer);
  321. protected override void Run(_ sink) => sink.Run(_source);
  322. internal sealed class _ : IdentitySink<long>
  323. {
  324. private bool _hasValue;
  325. private long _lastValue;
  326. public _(IObserver<long> observer)
  327. : base(observer)
  328. {
  329. _hasValue = false;
  330. _lastValue = default(long);
  331. }
  332. public override void OnNext(long value)
  333. {
  334. if (_hasValue)
  335. {
  336. if (value > _lastValue)
  337. {
  338. _lastValue = value;
  339. }
  340. }
  341. else
  342. {
  343. _lastValue = value;
  344. _hasValue = true;
  345. }
  346. }
  347. public override void OnCompleted()
  348. {
  349. if (!_hasValue)
  350. {
  351. ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  352. }
  353. else
  354. {
  355. ForwardOnNext(_lastValue);
  356. ForwardOnCompleted();
  357. }
  358. }
  359. }
  360. }
  361. internal sealed class MaxDoubleNullable : Producer<double?, MaxDoubleNullable._>
  362. {
  363. private readonly IObservable<double?> _source;
  364. public MaxDoubleNullable(IObservable<double?> source)
  365. {
  366. _source = source;
  367. }
  368. protected override _ CreateSink(IObserver<double?> observer) => new _(observer);
  369. protected override void Run(_ sink) => sink.Run(_source);
  370. internal sealed class _ : IdentitySink<double?>
  371. {
  372. private double? _lastValue;
  373. public _(IObserver<double?> observer)
  374. : base(observer)
  375. {
  376. _lastValue = default(double?);
  377. }
  378. public override void OnNext(double? value)
  379. {
  380. if (!value.HasValue)
  381. return;
  382. if (_lastValue.HasValue)
  383. {
  384. if (value > _lastValue || double.IsNaN((double)value))
  385. {
  386. _lastValue = value;
  387. }
  388. }
  389. else
  390. {
  391. _lastValue = value;
  392. }
  393. }
  394. public override void OnCompleted()
  395. {
  396. ForwardOnNext(_lastValue);
  397. ForwardOnCompleted();
  398. }
  399. }
  400. }
  401. internal sealed class MaxSingleNullable : Producer<float?, MaxSingleNullable._>
  402. {
  403. private readonly IObservable<float?> _source;
  404. public MaxSingleNullable(IObservable<float?> source)
  405. {
  406. _source = source;
  407. }
  408. protected override _ CreateSink(IObserver<float?> observer) => new _(observer);
  409. protected override void Run(_ sink) => sink.Run(_source);
  410. internal sealed class _ : IdentitySink<float?>
  411. {
  412. private float? _lastValue;
  413. public _(IObserver<float?> observer)
  414. : base(observer)
  415. {
  416. _lastValue = default(float?);
  417. }
  418. public override void OnNext(float? value)
  419. {
  420. if (!value.HasValue)
  421. return;
  422. if (_lastValue.HasValue)
  423. {
  424. if (value > _lastValue || float.IsNaN((float)value))
  425. {
  426. _lastValue = value;
  427. }
  428. }
  429. else
  430. {
  431. _lastValue = value;
  432. }
  433. }
  434. public override void OnCompleted()
  435. {
  436. ForwardOnNext(_lastValue);
  437. ForwardOnCompleted();
  438. }
  439. }
  440. }
  441. internal sealed class MaxDecimalNullable : Producer<decimal?, MaxDecimalNullable._>
  442. {
  443. private readonly IObservable<decimal?> _source;
  444. public MaxDecimalNullable(IObservable<decimal?> source)
  445. {
  446. _source = source;
  447. }
  448. protected override _ CreateSink(IObserver<decimal?> observer) => new _(observer);
  449. protected override void Run(_ sink) => sink.Run(_source);
  450. internal sealed class _ : IdentitySink<decimal?>
  451. {
  452. private decimal? _lastValue;
  453. public _(IObserver<decimal?> observer)
  454. : base(observer)
  455. {
  456. _lastValue = default(decimal?);
  457. }
  458. public override void OnNext(decimal? value)
  459. {
  460. if (!value.HasValue)
  461. return;
  462. if (_lastValue.HasValue)
  463. {
  464. if (value > _lastValue)
  465. {
  466. _lastValue = value;
  467. }
  468. }
  469. else
  470. {
  471. _lastValue = value;
  472. }
  473. }
  474. public override void OnCompleted()
  475. {
  476. ForwardOnNext(_lastValue);
  477. ForwardOnCompleted();
  478. }
  479. }
  480. }
  481. internal sealed class MaxInt32Nullable : Producer<int?, MaxInt32Nullable._>
  482. {
  483. private readonly IObservable<int?> _source;
  484. public MaxInt32Nullable(IObservable<int?> source)
  485. {
  486. _source = source;
  487. }
  488. protected override _ CreateSink(IObserver<int?> observer) => new _(observer);
  489. protected override void Run(_ sink) => sink.Run(_source);
  490. internal sealed class _ : IdentitySink<int?>
  491. {
  492. private int? _lastValue;
  493. public _(IObserver<int?> observer)
  494. : base(observer)
  495. {
  496. _lastValue = default(int?);
  497. }
  498. public override void OnNext(int? value)
  499. {
  500. if (!value.HasValue)
  501. return;
  502. if (_lastValue.HasValue)
  503. {
  504. if (value > _lastValue)
  505. {
  506. _lastValue = value;
  507. }
  508. }
  509. else
  510. {
  511. _lastValue = value;
  512. }
  513. }
  514. public override void OnCompleted()
  515. {
  516. ForwardOnNext(_lastValue);
  517. ForwardOnCompleted();
  518. }
  519. }
  520. }
  521. internal sealed class MaxInt64Nullable : Producer<long?, MaxInt64Nullable._>
  522. {
  523. private readonly IObservable<long?> _source;
  524. public MaxInt64Nullable(IObservable<long?> source)
  525. {
  526. _source = source;
  527. }
  528. protected override _ CreateSink(IObserver<long?> observer) => new _(observer);
  529. protected override void Run(_ sink) => sink.Run(_source);
  530. internal sealed class _ : IdentitySink<long?>
  531. {
  532. private long? _lastValue;
  533. public _(IObserver<long?> observer)
  534. : base(observer)
  535. {
  536. _lastValue = default(long?);
  537. }
  538. public override void OnNext(long? value)
  539. {
  540. if (!value.HasValue)
  541. return;
  542. if (_lastValue.HasValue)
  543. {
  544. if (value > _lastValue)
  545. {
  546. _lastValue = value;
  547. }
  548. }
  549. else
  550. {
  551. _lastValue = value;
  552. }
  553. }
  554. public override void OnCompleted()
  555. {
  556. ForwardOnNext(_lastValue);
  557. ForwardOnCompleted();
  558. }
  559. }
  560. }
  561. }