QueryLanguage.Aggregates.cs 54 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Reactive.Disposables;
  7. namespace System.Reactive.Linq
  8. {
  9. #if !NO_PERF
  10. using ObservableImpl;
  11. #endif
  12. internal partial class QueryLanguage
  13. {
  14. #region + Aggregate +
  15. public virtual IObservable<TAccumulate> Aggregate<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  16. {
  17. #if !NO_PERF
  18. return new Aggregate<TSource, TAccumulate, TAccumulate>(source, seed, accumulator, Stubs<TAccumulate>.I);
  19. #else
  20. return source.Scan(seed, accumulator).StartWith(seed).Final();
  21. #endif
  22. }
  23. public virtual IObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
  24. {
  25. #if !NO_PERF
  26. return new Aggregate<TSource, TAccumulate, TResult>(source, seed, accumulator, resultSelector);
  27. #else
  28. return Aggregate(source, seed, accumulator).Select(resultSelector);
  29. #endif
  30. }
  31. public virtual IObservable<TSource> Aggregate<TSource>(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  32. {
  33. #if !NO_PERF
  34. return new Aggregate<TSource>(source, accumulator);
  35. #else
  36. return source.Scan(accumulator).Final();
  37. #endif
  38. }
  39. public virtual IObservable<double> Average<TSource>(IObservable<TSource> source, Func<TSource, double> selector)
  40. {
  41. return Average(Select(source, selector));
  42. }
  43. public virtual IObservable<float> Average<TSource>(IObservable<TSource> source, Func<TSource, float> selector)
  44. {
  45. return Average(Select(source, selector));
  46. }
  47. public virtual IObservable<decimal> Average<TSource>(IObservable<TSource> source, Func<TSource, decimal> selector)
  48. {
  49. return Average(Select(source, selector));
  50. }
  51. public virtual IObservable<double> Average<TSource>(IObservable<TSource> source, Func<TSource, int> selector)
  52. {
  53. return Average(Select(source, selector));
  54. }
  55. public virtual IObservable<double> Average<TSource>(IObservable<TSource> source, Func<TSource, long> selector)
  56. {
  57. return Average(Select(source, selector));
  58. }
  59. public virtual IObservable<double?> Average<TSource>(IObservable<TSource> source, Func<TSource, double?> selector)
  60. {
  61. return Average(Select(source, selector));
  62. }
  63. public virtual IObservable<float?> Average<TSource>(IObservable<TSource> source, Func<TSource, float?> selector)
  64. {
  65. return Average(Select(source, selector));
  66. }
  67. public virtual IObservable<decimal?> Average<TSource>(IObservable<TSource> source, Func<TSource, decimal?> selector)
  68. {
  69. return Average(Select(source, selector));
  70. }
  71. public virtual IObservable<double?> Average<TSource>(IObservable<TSource> source, Func<TSource, int?> selector)
  72. {
  73. return Average(Select(source, selector));
  74. }
  75. public virtual IObservable<double?> Average<TSource>(IObservable<TSource> source, Func<TSource, long?> selector)
  76. {
  77. return Average(Select(source, selector));
  78. }
  79. #endregion
  80. #region + All +
  81. public virtual IObservable<bool> All<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  82. {
  83. #if !NO_PERF
  84. return new All<TSource>(source, predicate);
  85. #else
  86. return source.Where(v => !(predicate(v))).Any().Select(b => !b);
  87. #endif
  88. }
  89. #endregion
  90. #region + Any +
  91. public virtual IObservable<bool> Any<TSource>(IObservable<TSource> source)
  92. {
  93. #if !NO_PERF
  94. return new Any<TSource>(source);
  95. #else
  96. return new AnonymousObservable<bool>(observer => source.Subscribe(
  97. _ =>
  98. {
  99. observer.OnNext(true);
  100. observer.OnCompleted();
  101. },
  102. observer.OnError,
  103. () =>
  104. {
  105. observer.OnNext(false);
  106. observer.OnCompleted();
  107. }));
  108. #endif
  109. }
  110. public virtual IObservable<bool> Any<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  111. {
  112. #if !NO_PERF
  113. return new Any<TSource>(source, predicate);
  114. #else
  115. return source.Where(predicate).Any();
  116. #endif
  117. }
  118. #endregion
  119. #region + Average +
  120. public virtual IObservable<double> Average(IObservable<double> source)
  121. {
  122. #if !NO_PERF
  123. return new AverageDouble(source);
  124. #else
  125. return source.Scan(new { sum = 0.0, count = 0L },
  126. (prev, cur) => new { sum = prev.sum + cur, count = checked(prev.count + 1) })
  127. .Final()
  128. .Select(s => s.sum / (double)s.count);
  129. #endif
  130. }
  131. public virtual IObservable<float> Average(IObservable<float> source)
  132. {
  133. #if !NO_PERF
  134. return new AverageSingle(source);
  135. #else
  136. return source.Scan(new { sum = 0F, count = 0L }, // NOTE: Uses a different accumulator type (float), *not* conform LINQ to Objects.
  137. (prev, cur) => new { sum = prev.sum + cur, count = checked(prev.count + 1) })
  138. .Final()
  139. .Select(s => s.sum / (float)s.count);
  140. #endif
  141. }
  142. public virtual IObservable<decimal> Average(IObservable<decimal> source)
  143. {
  144. #if !NO_PERF
  145. return new AverageDecimal(source);
  146. #else
  147. return source.Scan(new { sum = 0M, count = 0L },
  148. (prev, cur) => new { sum = prev.sum + cur, count = checked(prev.count + 1) })
  149. .Final()
  150. .Select(s => s.sum / (decimal)s.count);
  151. #endif
  152. }
  153. public virtual IObservable<double> Average(IObservable<int> source)
  154. {
  155. #if !NO_PERF
  156. return new AverageInt32(source);
  157. #else
  158. return source.Scan(new { sum = 0L, count = 0L },
  159. (prev, cur) => new { sum = checked(prev.sum + cur), count = checked(prev.count + 1) })
  160. .Final()
  161. .Select(s => (double)s.sum / (double)s.count);
  162. #endif
  163. }
  164. public virtual IObservable<double> Average(IObservable<long> source)
  165. {
  166. #if !NO_PERF
  167. return new AverageInt64(source);
  168. #else
  169. return source.Scan(new { sum = 0L, count = 0L },
  170. (prev, cur) => new { sum = checked(prev.sum + cur), count = checked(prev.count + 1) })
  171. .Final()
  172. .Select(s => (double)s.sum / (double)s.count);
  173. #endif
  174. }
  175. public virtual IObservable<double?> Average(IObservable<double?> source)
  176. {
  177. #if !NO_PERF
  178. return new AverageDoubleNullable(source);
  179. #else
  180. return source.Aggregate(new { sum = new double?(0.0), count = 0L },
  181. (prev, cur) => cur != null ? new { sum = prev.sum + cur.GetValueOrDefault(), count = checked(prev.count + 1) } : prev)
  182. .Select(s => s.count == 0 ? default(double?) : (double?)s.sum / (double)s.count);
  183. #endif
  184. }
  185. public virtual IObservable<float?> Average(IObservable<float?> source)
  186. {
  187. #if !NO_PERF
  188. return new AverageSingleNullable(source);
  189. #else
  190. return source.Aggregate(new { sum = new float?(0f), count = 0L }, // NOTE: Uses a different accumulator type (float), *not* conform LINQ to Objects.
  191. (prev, cur) => cur != null ? new { sum = prev.sum + cur.GetValueOrDefault(), count = checked(prev.count + 1) } : prev)
  192. .Select(s => s.count == 0 ? default(float?) : (float?)s.sum / (float)s.count);
  193. #endif
  194. }
  195. public virtual IObservable<decimal?> Average(IObservable<decimal?> source)
  196. {
  197. #if !NO_PERF
  198. return new AverageDecimalNullable(source);
  199. #else
  200. return source.Aggregate(new { sum = new decimal?(0M), count = 0L },
  201. (prev, cur) => cur != null ? new { sum = prev.sum + cur.GetValueOrDefault(), count = checked(prev.count + 1) } : prev)
  202. .Select(s => s.count == 0 ? default(decimal?) : (decimal?)s.sum / (decimal)s.count);
  203. #endif
  204. }
  205. public virtual IObservable<double?> Average(IObservable<int?> source)
  206. {
  207. #if !NO_PERF
  208. return new AverageInt32Nullable(source);
  209. #else
  210. return source.Aggregate(new { sum = new long?(0), count = 0L },
  211. (prev, cur) => cur != null ? new { sum = checked(prev.sum + cur.GetValueOrDefault()), count = checked(prev.count + 1) } : prev)
  212. .Select(s => s.count == 0 ? default(double?) : (double?)s.sum / s.count);
  213. #endif
  214. }
  215. public virtual IObservable<double?> Average(IObservable<long?> source)
  216. {
  217. #if !NO_PERF
  218. return new AverageInt64Nullable(source);
  219. #else
  220. return source.Aggregate(new { sum = new long?(0), count = 0L },
  221. (prev, cur) => cur != null ? new { sum = checked(prev.sum + cur.GetValueOrDefault()), count = checked(prev.count + 1) } : prev)
  222. .Select(s => s.count == 0 ? default(double?): (double?)s.sum / s.count);
  223. #endif
  224. }
  225. #endregion
  226. #region + Contains +
  227. public virtual IObservable<bool> Contains<TSource>(IObservable<TSource> source, TSource value)
  228. {
  229. #if !NO_PERF
  230. return new Contains<TSource>(source, value, EqualityComparer<TSource>.Default);
  231. #else
  232. return Contains_<TSource>(source, value, EqualityComparer<TSource>.Default);
  233. #endif
  234. }
  235. public virtual IObservable<bool> Contains<TSource>(IObservable<TSource> source, TSource value, IEqualityComparer<TSource> comparer)
  236. {
  237. #if !NO_PERF
  238. return new Contains<TSource>(source, value, comparer);
  239. #else
  240. return Contains_<TSource>(source, value, comparer);
  241. #endif
  242. }
  243. #if NO_PERF
  244. private static IObservable<bool> Contains_<TSource>(IObservable<TSource> source, TSource value, IEqualityComparer<TSource> comparer)
  245. {
  246. return source.Where(v => comparer.Equals(v, value)).Any();
  247. }
  248. #endif
  249. #endregion
  250. #region + Count +
  251. public virtual IObservable<int> Count<TSource>(IObservable<TSource> source)
  252. {
  253. #if !NO_PERF
  254. return new Count<TSource>(source);
  255. #else
  256. return source.Aggregate(0, (count, _) => checked(count + 1));
  257. #endif
  258. }
  259. public virtual IObservable<int> Count<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  260. {
  261. #if !NO_PERF
  262. return new Count<TSource>(source, predicate);
  263. #else
  264. return source.Where(predicate).Aggregate(0, (count, _) => checked(count + 1));
  265. #endif
  266. }
  267. #endregion
  268. #region + ElementAt +
  269. public virtual IObservable<TSource> ElementAt<TSource>(IObservable<TSource> source, int index)
  270. {
  271. #if !NO_PERF
  272. return new ElementAt<TSource>(source, index, true);
  273. #else
  274. return new AnonymousObservable<TSource>(observer =>
  275. {
  276. int i = index;
  277. return source.Subscribe(
  278. x =>
  279. {
  280. if (i == 0)
  281. {
  282. observer.OnNext(x);
  283. observer.OnCompleted();
  284. }
  285. i--;
  286. },
  287. observer.OnError,
  288. () => observer.OnError(new ArgumentOutOfRangeException("index"))
  289. );
  290. });
  291. #endif
  292. }
  293. #endregion
  294. #region + ElementAtOrDefault +
  295. public virtual IObservable<TSource> ElementAtOrDefault<TSource>(IObservable<TSource> source, int index)
  296. {
  297. #if !NO_PERF
  298. return new ElementAt<TSource>(source, index, false);
  299. #else
  300. return new AnonymousObservable<TSource>(observer =>
  301. {
  302. int i = index;
  303. return source.Subscribe(
  304. x =>
  305. {
  306. if (i == 0)
  307. {
  308. observer.OnNext(x);
  309. observer.OnCompleted();
  310. }
  311. i--;
  312. },
  313. observer.OnError,
  314. () =>
  315. {
  316. observer.OnNext(default(TSource));
  317. observer.OnCompleted();
  318. }
  319. );
  320. });
  321. #endif
  322. }
  323. #endregion
  324. #region + FirstAsync +
  325. public virtual IObservable<TSource> FirstAsync<TSource>(IObservable<TSource> source)
  326. {
  327. #if !NO_PERF
  328. return new FirstAsync<TSource>(source, null, true);
  329. #else
  330. return FirstOrDefaultAsync_(source, true);
  331. #endif
  332. }
  333. public virtual IObservable<TSource> FirstAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  334. {
  335. #if !NO_PERF
  336. return new FirstAsync<TSource>(source, predicate, true);
  337. #else
  338. return source.Where(predicate).FirstAsync();
  339. #endif
  340. }
  341. #endregion
  342. #region + FirstAsyncOrDefaultAsync +
  343. public virtual IObservable<TSource> FirstOrDefaultAsync<TSource>(IObservable<TSource> source)
  344. {
  345. #if !NO_PERF
  346. return new FirstAsync<TSource>(source, null, false);
  347. #else
  348. return FirstOrDefaultAsync_(source, false);
  349. #endif
  350. }
  351. public virtual IObservable<TSource> FirstOrDefaultAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  352. {
  353. #if !NO_PERF
  354. return new FirstAsync<TSource>(source, predicate, false);
  355. #else
  356. return source.Where(predicate).FirstOrDefaultAsync();
  357. #endif
  358. }
  359. #if NO_PERF
  360. private static IObservable<TSource> FirstOrDefaultAsync_<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  361. {
  362. return new AnonymousObservable<TSource>(observer =>
  363. {
  364. return source.Subscribe(
  365. x =>
  366. {
  367. observer.OnNext(x);
  368. observer.OnCompleted();
  369. },
  370. observer.OnError,
  371. () =>
  372. {
  373. if (throwOnEmpty)
  374. {
  375. observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  376. }
  377. else
  378. {
  379. observer.OnNext(default(TSource));
  380. observer.OnCompleted();
  381. }
  382. }
  383. );
  384. });
  385. }
  386. #endif
  387. #endregion
  388. #region + IsEmpty +
  389. public virtual IObservable<bool> IsEmpty<TSource>(IObservable<TSource> source)
  390. {
  391. #if !NO_PERF
  392. return new IsEmpty<TSource>(source);
  393. #else
  394. return source.Any().Select(b => !b);
  395. #endif
  396. }
  397. #endregion
  398. #region + LastAsync +
  399. public virtual IObservable<TSource> LastAsync<TSource>(IObservable<TSource> source)
  400. {
  401. #if !NO_PERF
  402. return new LastAsync<TSource>(source, null, true);
  403. #else
  404. return LastOrDefaultAsync_(source, true);
  405. #endif
  406. }
  407. public virtual IObservable<TSource> LastAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  408. {
  409. #if !NO_PERF
  410. return new LastAsync<TSource>(source, predicate, true);
  411. #else
  412. return source.Where(predicate).LastAsync();
  413. #endif
  414. }
  415. #endregion
  416. #region + LastOrDefaultAsync +
  417. public virtual IObservable<TSource> LastOrDefaultAsync<TSource>(IObservable<TSource> source)
  418. {
  419. #if !NO_PERF
  420. return new LastAsync<TSource>(source, null, false);
  421. #else
  422. return LastOrDefaultAsync_(source, false);
  423. #endif
  424. }
  425. public virtual IObservable<TSource> LastOrDefaultAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  426. {
  427. #if !NO_PERF
  428. return new LastAsync<TSource>(source, predicate, false);
  429. #else
  430. return source.Where(predicate).LastOrDefaultAsync();
  431. #endif
  432. }
  433. #if NO_PERF
  434. private static IObservable<TSource> LastOrDefaultAsync_<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  435. {
  436. return new AnonymousObservable<TSource>(observer =>
  437. {
  438. var value = default(TSource);
  439. var seenValue = false;
  440. return source.Subscribe(
  441. x =>
  442. {
  443. value = x;
  444. seenValue = true;
  445. },
  446. observer.OnError,
  447. () =>
  448. {
  449. if (throwOnEmpty && !seenValue)
  450. {
  451. observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  452. }
  453. else
  454. {
  455. observer.OnNext(value);
  456. observer.OnCompleted();
  457. }
  458. }
  459. );
  460. });
  461. }
  462. #endif
  463. #endregion
  464. #region + LongCount +
  465. public virtual IObservable<long> LongCount<TSource>(IObservable<TSource> source)
  466. {
  467. #if !NO_PERF
  468. return new LongCount<TSource>(source);
  469. #else
  470. return source.Aggregate(0L, (count, _) => checked(count + 1));
  471. #endif
  472. }
  473. public virtual IObservable<long> LongCount<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  474. {
  475. #if !NO_PERF
  476. return new LongCount<TSource>(source, predicate);
  477. #else
  478. return source.Where(predicate).Aggregate(0L, (count, _) => checked(count + 1));
  479. #endif
  480. }
  481. #endregion
  482. #region + Max +
  483. public virtual IObservable<TSource> Max<TSource>(IObservable<TSource> source)
  484. {
  485. #if !NO_PERF
  486. // BREAKING CHANGE v2 > v1.x - Behavior for reference types
  487. return new Max<TSource>(source, Comparer<TSource>.Default);
  488. #else
  489. return MaxBy(source, x => x).Select(x => x.First());
  490. #endif
  491. }
  492. public virtual IObservable<TSource> Max<TSource>(IObservable<TSource> source, IComparer<TSource> comparer)
  493. {
  494. #if !NO_PERF
  495. // BREAKING CHANGE v2 > v1.x - Behavior for reference types
  496. return new Max<TSource>(source, comparer);
  497. #else
  498. return MaxBy(source, x => x, comparer).Select(x => x.First());
  499. #endif
  500. }
  501. public virtual IObservable<double> Max(IObservable<double> source)
  502. {
  503. #if !NO_PERF
  504. return new MaxDouble(source);
  505. #else
  506. return source.Scan(double.MinValue, Math.Max).Final();
  507. #endif
  508. }
  509. public virtual IObservable<float> Max(IObservable<float> source)
  510. {
  511. #if !NO_PERF
  512. return new MaxSingle(source);
  513. #else
  514. return source.Scan(float.MinValue, Math.Max).Final();
  515. #endif
  516. }
  517. public virtual IObservable<decimal> Max(IObservable<decimal> source)
  518. {
  519. #if !NO_PERF
  520. return new MaxDecimal(source);
  521. #else
  522. return source.Scan(decimal.MinValue, Math.Max).Final();
  523. #endif
  524. }
  525. public virtual IObservable<int> Max(IObservable<int> source)
  526. {
  527. #if !NO_PERF
  528. return new MaxInt32(source);
  529. #else
  530. return source.Scan(int.MinValue, Math.Max).Final();
  531. #endif
  532. }
  533. public virtual IObservable<long> Max(IObservable<long> source)
  534. {
  535. #if !NO_PERF
  536. return new MaxInt64(source);
  537. #else
  538. return source.Scan(long.MinValue, Math.Max).Final();
  539. #endif
  540. }
  541. public virtual IObservable<double?> Max(IObservable<double?> source)
  542. {
  543. #if !NO_PERF
  544. return new MaxDoubleNullable(source);
  545. #else
  546. return source.Aggregate(new double?(), NullableMax);
  547. #endif
  548. }
  549. public virtual IObservable<float?> Max(IObservable<float?> source)
  550. {
  551. #if !NO_PERF
  552. return new MaxSingleNullable(source);
  553. #else
  554. return source.Aggregate(new float?(), NullableMax);
  555. #endif
  556. }
  557. public virtual IObservable<decimal?> Max(IObservable<decimal?> source)
  558. {
  559. #if !NO_PERF
  560. return new MaxDecimalNullable(source);
  561. #else
  562. return source.Aggregate(new decimal?(), NullableMax);
  563. #endif
  564. }
  565. public virtual IObservable<int?> Max(IObservable<int?> source)
  566. {
  567. #if !NO_PERF
  568. return new MaxInt32Nullable(source);
  569. #else
  570. return source.Aggregate(new int?(), NullableMax);
  571. #endif
  572. }
  573. public virtual IObservable<long?> Max(IObservable<long?> source)
  574. {
  575. #if !NO_PERF
  576. return new MaxInt64Nullable(source);
  577. #else
  578. return source.Aggregate(new long?(), NullableMax);
  579. #endif
  580. }
  581. public virtual IObservable<TResult> Max<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
  582. {
  583. return Max(Select(source, selector));
  584. }
  585. public virtual IObservable<TResult> Max<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector, IComparer<TResult> comparer)
  586. {
  587. return Max(Select(source, selector), comparer);
  588. }
  589. public virtual IObservable<double> Max<TSource>(IObservable<TSource> source, Func<TSource, double> selector)
  590. {
  591. return Max(Select(source, selector));
  592. }
  593. public virtual IObservable<float> Max<TSource>(IObservable<TSource> source, Func<TSource, float> selector)
  594. {
  595. return Max(Select(source, selector));
  596. }
  597. public virtual IObservable<decimal> Max<TSource>(IObservable<TSource> source, Func<TSource, decimal> selector)
  598. {
  599. return Max(Select(source, selector));
  600. }
  601. public virtual IObservable<int> Max<TSource>(IObservable<TSource> source, Func<TSource, int> selector)
  602. {
  603. return Max(Select(source, selector));
  604. }
  605. public virtual IObservable<long> Max<TSource>(IObservable<TSource> source, Func<TSource, long> selector)
  606. {
  607. return Max(Select(source, selector));
  608. }
  609. public virtual IObservable<double?> Max<TSource>(IObservable<TSource> source, Func<TSource, double?> selector)
  610. {
  611. return Max(Select(source, selector));
  612. }
  613. public virtual IObservable<float?> Max<TSource>(IObservable<TSource> source, Func<TSource, float?> selector)
  614. {
  615. return Max(Select(source, selector));
  616. }
  617. public virtual IObservable<decimal?> Max<TSource>(IObservable<TSource> source, Func<TSource, decimal?> selector)
  618. {
  619. return Max(Select(source, selector));
  620. }
  621. public virtual IObservable<int?> Max<TSource>(IObservable<TSource> source, Func<TSource, int?> selector)
  622. {
  623. return Max(Select(source, selector));
  624. }
  625. public virtual IObservable<long?> Max<TSource>(IObservable<TSource> source, Func<TSource, long?> selector)
  626. {
  627. return Max(Select(source, selector));
  628. }
  629. #endregion
  630. #region + MaxBy +
  631. public virtual IObservable<IList<TSource>> MaxBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  632. {
  633. #if !NO_PERF
  634. return new MaxBy<TSource, TKey>(source, keySelector, Comparer<TKey>.Default);
  635. #else
  636. return MaxBy(source, keySelector, Comparer<TKey>.Default);
  637. #endif
  638. }
  639. public virtual IObservable<IList<TSource>> MaxBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  640. {
  641. #if !NO_PERF
  642. return new MaxBy<TSource, TKey>(source, keySelector, comparer);
  643. #else
  644. return ExtremaBy(source, keySelector, comparer);
  645. #endif
  646. }
  647. #endregion
  648. #region + Min +
  649. public virtual IObservable<TSource> Min<TSource>(IObservable<TSource> source)
  650. {
  651. #if !NO_PERF
  652. // BREAKING CHANGE v2 > v1.x - Behavior for reference types
  653. return new Min<TSource>(source, Comparer<TSource>.Default);
  654. #else
  655. return MinBy(source, x => x).Select(x => x.First());
  656. #endif
  657. }
  658. public virtual IObservable<TSource> Min<TSource>(IObservable<TSource> source, IComparer<TSource> comparer)
  659. {
  660. #if !NO_PERF
  661. // BREAKING CHANGE v2 > v1.x - Behavior for reference types
  662. return new Min<TSource>(source, comparer);
  663. #else
  664. return MinBy(source, x => x, comparer).Select(x => x.First());
  665. #endif
  666. }
  667. public virtual IObservable<double> Min(IObservable<double> source)
  668. {
  669. #if !NO_PERF
  670. return new MinDouble(source);
  671. #else
  672. return source.Scan(double.MaxValue, Math.Min).Final();
  673. #endif
  674. }
  675. public virtual IObservable<float> Min(IObservable<float> source)
  676. {
  677. #if !NO_PERF
  678. return new MinSingle(source);
  679. #else
  680. return source.Scan(float.MaxValue, Math.Min).Final();
  681. #endif
  682. }
  683. public virtual IObservable<decimal> Min(IObservable<decimal> source)
  684. {
  685. #if !NO_PERF
  686. return new MinDecimal(source);
  687. #else
  688. return source.Scan(decimal.MaxValue, Math.Min).Final();
  689. #endif
  690. }
  691. public virtual IObservable<int> Min(IObservable<int> source)
  692. {
  693. #if !NO_PERF
  694. return new MinInt32(source);
  695. #else
  696. return source.Scan(int.MaxValue, Math.Min).Final();
  697. #endif
  698. }
  699. public virtual IObservable<long> Min(IObservable<long> source)
  700. {
  701. #if !NO_PERF
  702. return new MinInt64(source);
  703. #else
  704. return source.Scan(long.MaxValue, Math.Min).Final();
  705. #endif
  706. }
  707. public virtual IObservable<double?> Min(IObservable<double?> source)
  708. {
  709. #if !NO_PERF
  710. return new MinDoubleNullable(source);
  711. #else
  712. return source.Aggregate(new double?(), NullableMin);
  713. #endif
  714. }
  715. public virtual IObservable<float?> Min(IObservable<float?> source)
  716. {
  717. #if !NO_PERF
  718. return new MinSingleNullable(source);
  719. #else
  720. return source.Aggregate(new float?(), NullableMin);
  721. #endif
  722. }
  723. public virtual IObservable<decimal?> Min(IObservable<decimal?> source)
  724. {
  725. #if !NO_PERF
  726. return new MinDecimalNullable(source);
  727. #else
  728. return source.Aggregate(new decimal?(), NullableMin);
  729. #endif
  730. }
  731. public virtual IObservable<int?> Min(IObservable<int?> source)
  732. {
  733. #if !NO_PERF
  734. return new MinInt32Nullable(source);
  735. #else
  736. return source.Aggregate(new int?(), NullableMin);
  737. #endif
  738. }
  739. public virtual IObservable<long?> Min(IObservable<long?> source)
  740. {
  741. #if !NO_PERF
  742. return new MinInt64Nullable(source);
  743. #else
  744. return source.Aggregate(new long?(), NullableMin);
  745. #endif
  746. }
  747. public virtual IObservable<TResult> Min<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
  748. {
  749. return Min(Select(source, selector));
  750. }
  751. public virtual IObservable<TResult> Min<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector, IComparer<TResult> comparer)
  752. {
  753. return Min(Select(source, selector), comparer);
  754. }
  755. public virtual IObservable<double> Min<TSource>(IObservable<TSource> source, Func<TSource, double> selector)
  756. {
  757. return Min(Select(source, selector));
  758. }
  759. public virtual IObservable<float> Min<TSource>(IObservable<TSource> source, Func<TSource, float> selector)
  760. {
  761. return Min(Select(source, selector));
  762. }
  763. public virtual IObservable<decimal> Min<TSource>(IObservable<TSource> source, Func<TSource, decimal> selector)
  764. {
  765. return Min(Select(source, selector));
  766. }
  767. public virtual IObservable<int> Min<TSource>(IObservable<TSource> source, Func<TSource, int> selector)
  768. {
  769. return Min(Select(source, selector));
  770. }
  771. public virtual IObservable<long> Min<TSource>(IObservable<TSource> source, Func<TSource, long> selector)
  772. {
  773. return Min(Select(source, selector));
  774. }
  775. public virtual IObservable<double?> Min<TSource>(IObservable<TSource> source, Func<TSource, double?> selector)
  776. {
  777. return Min(Select(source, selector));
  778. }
  779. public virtual IObservable<float?> Min<TSource>(IObservable<TSource> source, Func<TSource, float?> selector)
  780. {
  781. return Min(Select(source, selector));
  782. }
  783. public virtual IObservable<decimal?> Min<TSource>(IObservable<TSource> source, Func<TSource, decimal?> selector)
  784. {
  785. return Min(Select(source, selector));
  786. }
  787. public virtual IObservable<int?> Min<TSource>(IObservable<TSource> source, Func<TSource, int?> selector)
  788. {
  789. return Min(Select(source, selector));
  790. }
  791. public virtual IObservable<long?> Min<TSource>(IObservable<TSource> source, Func<TSource, long?> selector)
  792. {
  793. return Min(Select(source, selector));
  794. }
  795. #endregion
  796. #region + MinBy +
  797. public virtual IObservable<IList<TSource>> MinBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  798. {
  799. #if !NO_PERF
  800. return new MinBy<TSource, TKey>(source, keySelector, Comparer<TKey>.Default);
  801. #else
  802. return MinBy(source, keySelector, Comparer<TKey>.Default);
  803. #endif
  804. }
  805. public virtual IObservable<IList<TSource>> MinBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  806. {
  807. #if !NO_PERF
  808. return new MinBy<TSource, TKey>(source, keySelector, comparer);
  809. #else
  810. return ExtremaBy(source, keySelector, new AnonymousComparer<TKey>((x, y) => comparer.Compare(x, y) * -1));
  811. #endif
  812. }
  813. #endregion
  814. #region + SequenceEqual +
  815. public virtual IObservable<bool> SequenceEqual<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  816. {
  817. #if !NO_PERF
  818. return new SequenceEqual<TSource>(first, second, EqualityComparer<TSource>.Default);
  819. #else
  820. return first.SequenceEqual(second, EqualityComparer<TSource>.Default);
  821. #endif
  822. }
  823. public virtual IObservable<bool> SequenceEqual<TSource>(IObservable<TSource> first, IObservable<TSource> second, IEqualityComparer<TSource> comparer)
  824. {
  825. #if !NO_PERF
  826. return new SequenceEqual<TSource>(first, second, comparer);
  827. #else
  828. return new AnonymousObservable<bool>(observer =>
  829. {
  830. var gate = new object();
  831. var donel = false;
  832. var doner = false;
  833. var ql = new Queue<TSource>();
  834. var qr = new Queue<TSource>();
  835. var subscription1 = first.Subscribe(
  836. x =>
  837. {
  838. lock (gate)
  839. {
  840. if (qr.Count > 0)
  841. {
  842. var equal = false;
  843. var v = qr.Dequeue();
  844. try
  845. {
  846. equal = comparer.Equals(x, v);
  847. }
  848. catch (Exception exception)
  849. {
  850. observer.OnError(exception);
  851. return;
  852. }
  853. if (!equal)
  854. {
  855. observer.OnNext(false);
  856. observer.OnCompleted();
  857. }
  858. }
  859. else if (doner)
  860. {
  861. observer.OnNext(false);
  862. observer.OnCompleted();
  863. }
  864. else
  865. ql.Enqueue(x);
  866. }
  867. },
  868. observer.OnError,
  869. () =>
  870. {
  871. lock (gate)
  872. {
  873. donel = true;
  874. if (ql.Count == 0)
  875. {
  876. if (qr.Count > 0)
  877. {
  878. observer.OnNext(false);
  879. observer.OnCompleted();
  880. }
  881. else if (doner)
  882. {
  883. observer.OnNext(true);
  884. observer.OnCompleted();
  885. }
  886. }
  887. }
  888. });
  889. var subscription2 = second.Subscribe(
  890. x =>
  891. {
  892. lock (gate)
  893. {
  894. if (ql.Count > 0)
  895. {
  896. var equal = false;
  897. var v = ql.Dequeue();
  898. try
  899. {
  900. equal = comparer.Equals(v, x);
  901. }
  902. catch (Exception exception)
  903. {
  904. observer.OnError(exception);
  905. return;
  906. }
  907. if (!equal)
  908. {
  909. observer.OnNext(false);
  910. observer.OnCompleted();
  911. }
  912. }
  913. else if (donel)
  914. {
  915. observer.OnNext(false);
  916. observer.OnCompleted();
  917. }
  918. else
  919. qr.Enqueue(x);
  920. }
  921. },
  922. observer.OnError,
  923. () =>
  924. {
  925. lock (gate)
  926. {
  927. doner = true;
  928. if (qr.Count == 0)
  929. {
  930. if (ql.Count > 0)
  931. {
  932. observer.OnNext(false);
  933. observer.OnCompleted();
  934. }
  935. else if (donel)
  936. {
  937. observer.OnNext(true);
  938. observer.OnCompleted();
  939. }
  940. }
  941. }
  942. });
  943. return new CompositeDisposable(subscription1, subscription2);
  944. });
  945. #endif
  946. }
  947. public virtual IObservable<bool> SequenceEqual<TSource>(IObservable<TSource> first, IEnumerable<TSource> second)
  948. {
  949. #if !NO_PERF
  950. return new SequenceEqual<TSource>(first, second, EqualityComparer<TSource>.Default);
  951. #else
  952. return SequenceEqual<TSource>(first, second, EqualityComparer<TSource>.Default);
  953. #endif
  954. }
  955. public virtual IObservable<bool> SequenceEqual<TSource>(IObservable<TSource> first, IEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
  956. {
  957. #if !NO_PERF
  958. return new SequenceEqual<TSource>(first, second, comparer);
  959. #else
  960. return new AnonymousObservable<bool>(observer =>
  961. {
  962. var e = default(IEnumerator<TSource>);
  963. try
  964. {
  965. e = second.GetEnumerator();
  966. }
  967. catch (Exception ex)
  968. {
  969. observer.OnError(ex);
  970. return Disposable.Empty;
  971. }
  972. return new CompositeDisposable(
  973. first.Subscribe(
  974. value =>
  975. {
  976. var equal = false;
  977. try
  978. {
  979. var hasNext = e.MoveNext();
  980. if (hasNext)
  981. {
  982. var current = e.Current;
  983. equal = comparer.Equals(value, current);
  984. }
  985. }
  986. catch (Exception ex)
  987. {
  988. observer.OnError(ex);
  989. return;
  990. }
  991. if (!equal)
  992. {
  993. observer.OnNext(false);
  994. observer.OnCompleted();
  995. }
  996. },
  997. observer.OnError,
  998. () =>
  999. {
  1000. var hasNext = false;
  1001. try
  1002. {
  1003. hasNext = e.MoveNext();
  1004. }
  1005. catch (Exception exception)
  1006. {
  1007. observer.OnError(exception);
  1008. return;
  1009. }
  1010. observer.OnNext(!hasNext);
  1011. observer.OnCompleted();
  1012. }
  1013. ),
  1014. e
  1015. );
  1016. });
  1017. #endif
  1018. }
  1019. #endregion
  1020. #region + SingleAsync +
  1021. public virtual IObservable<TSource> SingleAsync<TSource>(IObservable<TSource> source)
  1022. {
  1023. #if !NO_PERF
  1024. return new SingleAsync<TSource>(source, null, true);
  1025. #else
  1026. return SingleOrDefaultAsync_(source, true);
  1027. #endif
  1028. }
  1029. public virtual IObservable<TSource> SingleAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  1030. {
  1031. #if !NO_PERF
  1032. return new SingleAsync<TSource>(source, predicate, true);
  1033. #else
  1034. return source.Where(predicate).SingleAsync();
  1035. #endif
  1036. }
  1037. #endregion
  1038. #region + SingleOrDefaultAsync +
  1039. public virtual IObservable<TSource> SingleOrDefaultAsync<TSource>(IObservable<TSource> source)
  1040. {
  1041. #if !NO_PERF
  1042. return new SingleAsync<TSource>(source, null, false);
  1043. #else
  1044. return SingleOrDefaultAsync_(source, false);
  1045. #endif
  1046. }
  1047. public virtual IObservable<TSource> SingleOrDefaultAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  1048. {
  1049. #if !NO_PERF
  1050. return new SingleAsync<TSource>(source, predicate, false);
  1051. #else
  1052. return source.Where(predicate).SingleOrDefaultAsync();
  1053. #endif
  1054. }
  1055. #if NO_PERF
  1056. private static IObservable<TSource> SingleOrDefaultAsync_<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  1057. {
  1058. return new AnonymousObservable<TSource>(observer =>
  1059. {
  1060. var value = default(TSource);
  1061. var seenValue = false;
  1062. return source.Subscribe(
  1063. x =>
  1064. {
  1065. if (seenValue)
  1066. {
  1067. observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT));
  1068. }
  1069. else
  1070. {
  1071. value = x;
  1072. seenValue = true;
  1073. }
  1074. },
  1075. observer.OnError,
  1076. () =>
  1077. {
  1078. if (throwOnEmpty && !seenValue)
  1079. {
  1080. observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  1081. }
  1082. else
  1083. {
  1084. observer.OnNext(value);
  1085. observer.OnCompleted();
  1086. }
  1087. }
  1088. );
  1089. });
  1090. }
  1091. #endif
  1092. #endregion
  1093. #region + Sum +
  1094. public virtual IObservable<double> Sum(IObservable<double> source)
  1095. {
  1096. #if !NO_PERF
  1097. return new SumDouble(source);
  1098. #else
  1099. return source.Aggregate(0.0, (prev, curr) => prev + curr);
  1100. #endif
  1101. }
  1102. public virtual IObservable<float> Sum(IObservable<float> source)
  1103. {
  1104. #if !NO_PERF
  1105. return new SumSingle(source);
  1106. #else
  1107. return source.Aggregate(0f, (prev, curr) => prev + curr);
  1108. #endif
  1109. }
  1110. public virtual IObservable<decimal> Sum(IObservable<decimal> source)
  1111. {
  1112. #if !NO_PERF
  1113. return new SumDecimal(source);
  1114. #else
  1115. return source.Aggregate(0M, (prev, curr) => prev + curr);
  1116. #endif
  1117. }
  1118. public virtual IObservable<int> Sum(IObservable<int> source)
  1119. {
  1120. #if !NO_PERF
  1121. return new SumInt32(source);
  1122. #else
  1123. return source.Aggregate(0, (prev, curr) => checked(prev + curr));
  1124. #endif
  1125. }
  1126. public virtual IObservable<long> Sum(IObservable<long> source)
  1127. {
  1128. #if !NO_PERF
  1129. return new SumInt64(source);
  1130. #else
  1131. return source.Aggregate(0L, (prev, curr) => checked(prev + curr));
  1132. #endif
  1133. }
  1134. public virtual IObservable<double?> Sum(IObservable<double?> source)
  1135. {
  1136. #if !NO_PERF
  1137. return new SumDoubleNullable(source);
  1138. #else
  1139. return source.Aggregate(0.0, (prev, curr) => prev + curr.GetValueOrDefault()).Select(x => (double?)x);
  1140. #endif
  1141. }
  1142. public virtual IObservable<float?> Sum(IObservable<float?> source)
  1143. {
  1144. #if !NO_PERF
  1145. return new SumSingleNullable(source);
  1146. #else
  1147. return source.Aggregate(0f, (prev, curr) => prev + curr.GetValueOrDefault()).Select(x => (float?)x);
  1148. #endif
  1149. }
  1150. public virtual IObservable<decimal?> Sum(IObservable<decimal?> source)
  1151. {
  1152. #if !NO_PERF
  1153. return new SumDecimalNullable(source);
  1154. #else
  1155. return source.Aggregate(0M, (prev, curr) => prev + curr.GetValueOrDefault()).Select(x => (decimal?)x);
  1156. #endif
  1157. }
  1158. public virtual IObservable<int?> Sum(IObservable<int?> source)
  1159. {
  1160. #if !NO_PERF
  1161. return new SumInt32Nullable(source);
  1162. #else
  1163. return source.Aggregate(0, (prev, curr) => checked(prev + curr.GetValueOrDefault())).Select(x => (int?)x);
  1164. #endif
  1165. }
  1166. public virtual IObservable<long?> Sum(IObservable<long?> source)
  1167. {
  1168. #if !NO_PERF
  1169. return new SumInt64Nullable(source);
  1170. #else
  1171. return source.Aggregate(0L, (prev, curr) => checked(prev + curr.GetValueOrDefault())).Select(x => (long?)x);
  1172. #endif
  1173. }
  1174. public virtual IObservable<double> Sum<TSource>(IObservable<TSource> source, Func<TSource, double> selector)
  1175. {
  1176. return Sum(Select(source, selector));
  1177. }
  1178. public virtual IObservable<float> Sum<TSource>(IObservable<TSource> source, Func<TSource, float> selector)
  1179. {
  1180. return Sum(Select(source, selector));
  1181. }
  1182. public virtual IObservable<decimal> Sum<TSource>(IObservable<TSource> source, Func<TSource, decimal> selector)
  1183. {
  1184. return Sum(Select(source, selector));
  1185. }
  1186. public virtual IObservable<int> Sum<TSource>(IObservable<TSource> source, Func<TSource, int> selector)
  1187. {
  1188. return Sum(Select(source, selector));
  1189. }
  1190. public virtual IObservable<long> Sum<TSource>(IObservable<TSource> source, Func<TSource, long> selector)
  1191. {
  1192. return Sum(Select(source, selector));
  1193. }
  1194. public virtual IObservable<double?> Sum<TSource>(IObservable<TSource> source, Func<TSource, double?> selector)
  1195. {
  1196. return Sum(Select(source, selector));
  1197. }
  1198. public virtual IObservable<float?> Sum<TSource>(IObservable<TSource> source, Func<TSource, float?> selector)
  1199. {
  1200. return Sum(Select(source, selector));
  1201. }
  1202. public virtual IObservable<decimal?> Sum<TSource>(IObservable<TSource> source, Func<TSource, decimal?> selector)
  1203. {
  1204. return Sum(Select(source, selector));
  1205. }
  1206. public virtual IObservable<int?> Sum<TSource>(IObservable<TSource> source, Func<TSource, int?> selector)
  1207. {
  1208. return Sum(Select(source, selector));
  1209. }
  1210. public virtual IObservable<long?> Sum<TSource>(IObservable<TSource> source, Func<TSource, long?> selector)
  1211. {
  1212. return Sum(Select(source, selector));
  1213. }
  1214. #endregion
  1215. #region + ToArray +
  1216. public virtual IObservable<TSource[]> ToArray<TSource>(IObservable<TSource> source)
  1217. {
  1218. #if !NO_PERF
  1219. return new ToArray<TSource>(source);
  1220. #else
  1221. return source.ToList().Select(xs => xs.ToArray());
  1222. #endif
  1223. }
  1224. #endregion
  1225. #region + ToDictionary +
  1226. public virtual IObservable<IDictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  1227. {
  1228. #if !NO_PERF
  1229. return new ToDictionary<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
  1230. #else
  1231. return source.Aggregate((IDictionary<TKey, TElement>)new Dictionary<TKey, TElement>(comparer), (dict, x) =>
  1232. {
  1233. dict.Add(keySelector(x), elementSelector(x));
  1234. return dict;
  1235. });
  1236. #endif
  1237. }
  1238. public virtual IObservable<IDictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  1239. {
  1240. #if !NO_PERF
  1241. return new ToDictionary<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1242. #else
  1243. return source.ToDictionary(keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1244. #endif
  1245. }
  1246. public virtual IObservable<IDictionary<TKey, TSource>> ToDictionary<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  1247. {
  1248. #if !NO_PERF
  1249. return new ToDictionary<TSource, TKey, TSource>(source, keySelector, x => x, comparer);
  1250. #else
  1251. return source.ToDictionary(keySelector, x => x, comparer);
  1252. #endif
  1253. }
  1254. public virtual IObservable<IDictionary<TKey, TSource>> ToDictionary<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  1255. {
  1256. #if !NO_PERF
  1257. return new ToDictionary<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default);
  1258. #else
  1259. return source.ToDictionary(keySelector, x => x, EqualityComparer<TKey>.Default);
  1260. #endif
  1261. }
  1262. #endregion
  1263. #region + ToList +
  1264. public virtual IObservable<IList<TSource>> ToList<TSource>(IObservable<TSource> source)
  1265. {
  1266. #if !NO_PERF
  1267. return new ToList<TSource>(source);
  1268. #else
  1269. return source.Aggregate((IList<TSource>)new List<TSource>(), (list, x) =>
  1270. {
  1271. list.Add(x);
  1272. return list;
  1273. });
  1274. #endif
  1275. }
  1276. #endregion
  1277. #region + ToLookup +
  1278. public virtual IObservable<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  1279. {
  1280. #if !NO_PERF
  1281. return new ToLookup<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
  1282. #else
  1283. return source.Aggregate(new Lookup<TKey, TElement>(comparer), (lookup, x) =>
  1284. {
  1285. lookup.Add(keySelector(x), elementSelector(x));
  1286. return lookup;
  1287. }).Select(xs => (ILookup<TKey, TElement>)xs);
  1288. #endif
  1289. }
  1290. public virtual IObservable<ILookup<TKey, TSource>> ToLookup<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  1291. {
  1292. #if !NO_PERF
  1293. return new ToLookup<TSource, TKey, TSource>(source, keySelector, x => x, comparer);
  1294. #else
  1295. return source.ToLookup(keySelector, x => x, comparer);
  1296. #endif
  1297. }
  1298. public virtual IObservable<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  1299. {
  1300. #if !NO_PERF
  1301. return new ToLookup<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1302. #else
  1303. return source.ToLookup(keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1304. #endif
  1305. }
  1306. public virtual IObservable<ILookup<TKey, TSource>> ToLookup<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  1307. {
  1308. #if !NO_PERF
  1309. return new ToLookup<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default);
  1310. #else
  1311. return source.ToLookup(keySelector, x => x, EqualityComparer<TKey>.Default);
  1312. #endif
  1313. }
  1314. #endregion
  1315. #region |> Helpers <|
  1316. #if NO_PERF
  1317. private static T? NullableMin<T>(T? x, T? y)
  1318. where T : struct, IComparable<T>
  1319. {
  1320. if (!x.HasValue)
  1321. return y;
  1322. if (!y.HasValue)
  1323. return x;
  1324. if (x.Value.CompareTo(y.Value) <= 0)
  1325. return x;
  1326. return y;
  1327. }
  1328. private static T? NullableMax<T>(T? x, T? y)
  1329. where T : struct, IComparable<T>
  1330. {
  1331. if (!x.HasValue)
  1332. return y;
  1333. if (!y.HasValue)
  1334. return x;
  1335. if (x.Value.CompareTo(y.Value) >= 0)
  1336. return x;
  1337. return y;
  1338. }
  1339. private static IObservable<IList<TSource>> ExtremaBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  1340. {
  1341. return new AnonymousObservable<IList<TSource>>(observer =>
  1342. {
  1343. var hasValue = false;
  1344. var lastKey = default(TKey);
  1345. var list = new List<TSource>();
  1346. return source.Subscribe(
  1347. x =>
  1348. {
  1349. var key = default(TKey);
  1350. try
  1351. {
  1352. key = keySelector(x);
  1353. }
  1354. catch (Exception ex)
  1355. {
  1356. observer.OnError(ex);
  1357. return;
  1358. }
  1359. var comparison = 0;
  1360. if (!hasValue)
  1361. {
  1362. hasValue = true;
  1363. lastKey = key;
  1364. }
  1365. else
  1366. {
  1367. try
  1368. {
  1369. comparison = comparer.Compare(key, lastKey);
  1370. }
  1371. catch (Exception ex)
  1372. {
  1373. observer.OnError(ex);
  1374. return;
  1375. }
  1376. }
  1377. if (comparison > 0)
  1378. {
  1379. lastKey = key;
  1380. list.Clear();
  1381. }
  1382. if (comparison >= 0)
  1383. {
  1384. list.Add(x);
  1385. }
  1386. },
  1387. observer.OnError,
  1388. () =>
  1389. {
  1390. observer.OnNext(list);
  1391. observer.OnCompleted();
  1392. }
  1393. );
  1394. });
  1395. }
  1396. #endif
  1397. #endregion
  1398. }
  1399. #region |> Helper types <|
  1400. #if NO_PERF
  1401. static class AggregateExtensions
  1402. {
  1403. public static IObservable<TSource> Final<TSource>(this IObservable<TSource> source)
  1404. {
  1405. return new AnonymousObservable<TSource>(observer =>
  1406. {
  1407. var value = default(TSource);
  1408. var hasValue = false;
  1409. return source.Subscribe(
  1410. x =>
  1411. {
  1412. hasValue = true;
  1413. value = x;
  1414. },
  1415. observer.OnError,
  1416. () =>
  1417. {
  1418. if (!hasValue)
  1419. observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  1420. else
  1421. {
  1422. observer.OnNext(value);
  1423. observer.OnCompleted();
  1424. }
  1425. });
  1426. });
  1427. }
  1428. }
  1429. sealed class AnonymousComparer<T> : IComparer<T>
  1430. {
  1431. private readonly Func<T, T, int> comparer;
  1432. /// <summary>
  1433. /// Creates an instance of IComparer by providing a method that compares two objects.
  1434. /// </summary>
  1435. public AnonymousComparer(Func<T, T, int> comparer)
  1436. {
  1437. this.comparer = comparer;
  1438. }
  1439. /// <summary>
  1440. /// Compares two objects and returns a value indicating whether one is less than, equal to, or greater than the other.
  1441. /// </summary>
  1442. public int Compare(T x, T y)
  1443. {
  1444. return comparer(x, y);
  1445. }
  1446. }
  1447. #endif
  1448. #endregion
  1449. }