1
0

QueryLanguage.Aggregates.cs 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive.Linq
  6. {
  7. #if !NO_PERF
  8. using ObservableImpl;
  9. #endif
  10. internal partial class QueryLanguage
  11. {
  12. #region + Aggregate +
  13. public virtual IObservable<TAccumulate> Aggregate<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  14. {
  15. #if !NO_PERF
  16. return new Aggregate<TSource, TAccumulate, TAccumulate>(source, seed, accumulator, Stubs<TAccumulate>.I);
  17. #else
  18. return source.Scan(seed, accumulator).StartWith(seed).Final();
  19. #endif
  20. }
  21. public virtual IObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
  22. {
  23. #if !NO_PERF
  24. return new Aggregate<TSource, TAccumulate, TResult>(source, seed, accumulator, resultSelector);
  25. #else
  26. return Aggregate(source, seed, accumulator).Select(resultSelector);
  27. #endif
  28. }
  29. public virtual IObservable<TSource> Aggregate<TSource>(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  30. {
  31. #if !NO_PERF
  32. return new Aggregate<TSource>(source, accumulator);
  33. #else
  34. return source.Scan(accumulator).Final();
  35. #endif
  36. }
  37. public virtual IObservable<double> Average<TSource>(IObservable<TSource> source, Func<TSource, double> selector)
  38. {
  39. return Average(Select(source, selector));
  40. }
  41. public virtual IObservable<float> Average<TSource>(IObservable<TSource> source, Func<TSource, float> selector)
  42. {
  43. return Average(Select(source, selector));
  44. }
  45. public virtual IObservable<decimal> Average<TSource>(IObservable<TSource> source, Func<TSource, decimal> selector)
  46. {
  47. return Average(Select(source, selector));
  48. }
  49. public virtual IObservable<double> Average<TSource>(IObservable<TSource> source, Func<TSource, int> selector)
  50. {
  51. return Average(Select(source, selector));
  52. }
  53. public virtual IObservable<double> Average<TSource>(IObservable<TSource> source, Func<TSource, long> selector)
  54. {
  55. return Average(Select(source, selector));
  56. }
  57. public virtual IObservable<double?> Average<TSource>(IObservable<TSource> source, Func<TSource, double?> selector)
  58. {
  59. return Average(Select(source, selector));
  60. }
  61. public virtual IObservable<float?> Average<TSource>(IObservable<TSource> source, Func<TSource, float?> selector)
  62. {
  63. return Average(Select(source, selector));
  64. }
  65. public virtual IObservable<decimal?> Average<TSource>(IObservable<TSource> source, Func<TSource, decimal?> selector)
  66. {
  67. return Average(Select(source, selector));
  68. }
  69. public virtual IObservable<double?> Average<TSource>(IObservable<TSource> source, Func<TSource, int?> selector)
  70. {
  71. return Average(Select(source, selector));
  72. }
  73. public virtual IObservable<double?> Average<TSource>(IObservable<TSource> source, Func<TSource, long?> selector)
  74. {
  75. return Average(Select(source, selector));
  76. }
  77. #endregion
  78. #region + All +
  79. public virtual IObservable<bool> All<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  80. {
  81. #if !NO_PERF
  82. return new All<TSource>(source, predicate);
  83. #else
  84. return source.Where(v => !(predicate(v))).Any().Select(b => !b);
  85. #endif
  86. }
  87. #endregion
  88. #region + Any +
  89. public virtual IObservable<bool> Any<TSource>(IObservable<TSource> source)
  90. {
  91. #if !NO_PERF
  92. return new Any<TSource>(source);
  93. #else
  94. return new AnonymousObservable<bool>(observer => source.Subscribe(
  95. _ =>
  96. {
  97. observer.OnNext(true);
  98. observer.OnCompleted();
  99. },
  100. observer.OnError,
  101. () =>
  102. {
  103. observer.OnNext(false);
  104. observer.OnCompleted();
  105. }));
  106. #endif
  107. }
  108. public virtual IObservable<bool> Any<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  109. {
  110. #if !NO_PERF
  111. return new Any<TSource>(source, predicate);
  112. #else
  113. return source.Where(predicate).Any();
  114. #endif
  115. }
  116. #endregion
  117. #region + Average +
  118. public virtual IObservable<double> Average(IObservable<double> source)
  119. {
  120. #if !NO_PERF
  121. return new AverageDouble(source);
  122. #else
  123. return source.Scan(new { sum = 0.0, count = 0L },
  124. (prev, cur) => new { sum = prev.sum + cur, count = checked(prev.count + 1) })
  125. .Final()
  126. .Select(s => s.sum / (double)s.count);
  127. #endif
  128. }
  129. public virtual IObservable<float> Average(IObservable<float> source)
  130. {
  131. #if !NO_PERF
  132. return new AverageSingle(source);
  133. #else
  134. return source.Scan(new { sum = 0F, count = 0L }, // NOTE: Uses a different accumulator type (float), *not* conform LINQ to Objects.
  135. (prev, cur) => new { sum = prev.sum + cur, count = checked(prev.count + 1) })
  136. .Final()
  137. .Select(s => s.sum / (float)s.count);
  138. #endif
  139. }
  140. public virtual IObservable<decimal> Average(IObservable<decimal> source)
  141. {
  142. #if !NO_PERF
  143. return new AverageDecimal(source);
  144. #else
  145. return source.Scan(new { sum = 0M, count = 0L },
  146. (prev, cur) => new { sum = prev.sum + cur, count = checked(prev.count + 1) })
  147. .Final()
  148. .Select(s => s.sum / (decimal)s.count);
  149. #endif
  150. }
  151. public virtual IObservable<double> Average(IObservable<int> source)
  152. {
  153. #if !NO_PERF
  154. return new AverageInt32(source);
  155. #else
  156. return source.Scan(new { sum = 0L, count = 0L },
  157. (prev, cur) => new { sum = checked(prev.sum + cur), count = checked(prev.count + 1) })
  158. .Final()
  159. .Select(s => (double)s.sum / (double)s.count);
  160. #endif
  161. }
  162. public virtual IObservable<double> Average(IObservable<long> source)
  163. {
  164. #if !NO_PERF
  165. return new AverageInt64(source);
  166. #else
  167. return source.Scan(new { sum = 0L, count = 0L },
  168. (prev, cur) => new { sum = checked(prev.sum + cur), count = checked(prev.count + 1) })
  169. .Final()
  170. .Select(s => (double)s.sum / (double)s.count);
  171. #endif
  172. }
  173. public virtual IObservable<double?> Average(IObservable<double?> source)
  174. {
  175. #if !NO_PERF
  176. return new AverageDoubleNullable(source);
  177. #else
  178. return source.Aggregate(new { sum = new double?(0.0), count = 0L },
  179. (prev, cur) => cur != null ? new { sum = prev.sum + cur.GetValueOrDefault(), count = checked(prev.count + 1) } : prev)
  180. .Select(s => s.count == 0 ? default(double?) : (double?)s.sum / (double)s.count);
  181. #endif
  182. }
  183. public virtual IObservable<float?> Average(IObservable<float?> source)
  184. {
  185. #if !NO_PERF
  186. return new AverageSingleNullable(source);
  187. #else
  188. return source.Aggregate(new { sum = new float?(0f), count = 0L }, // NOTE: Uses a different accumulator type (float), *not* conform LINQ to Objects.
  189. (prev, cur) => cur != null ? new { sum = prev.sum + cur.GetValueOrDefault(), count = checked(prev.count + 1) } : prev)
  190. .Select(s => s.count == 0 ? default(float?) : (float?)s.sum / (float)s.count);
  191. #endif
  192. }
  193. public virtual IObservable<decimal?> Average(IObservable<decimal?> source)
  194. {
  195. #if !NO_PERF
  196. return new AverageDecimalNullable(source);
  197. #else
  198. return source.Aggregate(new { sum = new decimal?(0M), count = 0L },
  199. (prev, cur) => cur != null ? new { sum = prev.sum + cur.GetValueOrDefault(), count = checked(prev.count + 1) } : prev)
  200. .Select(s => s.count == 0 ? default(decimal?) : (decimal?)s.sum / (decimal)s.count);
  201. #endif
  202. }
  203. public virtual IObservable<double?> Average(IObservable<int?> source)
  204. {
  205. #if !NO_PERF
  206. return new AverageInt32Nullable(source);
  207. #else
  208. return source.Aggregate(new { sum = new long?(0), count = 0L },
  209. (prev, cur) => cur != null ? new { sum = checked(prev.sum + cur.GetValueOrDefault()), count = checked(prev.count + 1) } : prev)
  210. .Select(s => s.count == 0 ? default(double?) : (double?)s.sum / s.count);
  211. #endif
  212. }
  213. public virtual IObservable<double?> Average(IObservable<long?> source)
  214. {
  215. #if !NO_PERF
  216. return new AverageInt64Nullable(source);
  217. #else
  218. return source.Aggregate(new { sum = new long?(0), count = 0L },
  219. (prev, cur) => cur != null ? new { sum = checked(prev.sum + cur.GetValueOrDefault()), count = checked(prev.count + 1) } : prev)
  220. .Select(s => s.count == 0 ? default(double?): (double?)s.sum / s.count);
  221. #endif
  222. }
  223. #endregion
  224. #region + Contains +
  225. public virtual IObservable<bool> Contains<TSource>(IObservable<TSource> source, TSource value)
  226. {
  227. #if !NO_PERF
  228. return new Contains<TSource>(source, value, EqualityComparer<TSource>.Default);
  229. #else
  230. return Contains_<TSource>(source, value, EqualityComparer<TSource>.Default);
  231. #endif
  232. }
  233. public virtual IObservable<bool> Contains<TSource>(IObservable<TSource> source, TSource value, IEqualityComparer<TSource> comparer)
  234. {
  235. #if !NO_PERF
  236. return new Contains<TSource>(source, value, comparer);
  237. #else
  238. return Contains_<TSource>(source, value, comparer);
  239. #endif
  240. }
  241. #if NO_PERF
  242. private static IObservable<bool> Contains_<TSource>(IObservable<TSource> source, TSource value, IEqualityComparer<TSource> comparer)
  243. {
  244. return source.Where(v => comparer.Equals(v, value)).Any();
  245. }
  246. #endif
  247. #endregion
  248. #region + Count +
  249. public virtual IObservable<int> Count<TSource>(IObservable<TSource> source)
  250. {
  251. #if !NO_PERF
  252. return new Count<TSource>(source);
  253. #else
  254. return source.Aggregate(0, (count, _) => checked(count + 1));
  255. #endif
  256. }
  257. public virtual IObservable<int> Count<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  258. {
  259. #if !NO_PERF
  260. return new Count<TSource>(source, predicate);
  261. #else
  262. return source.Where(predicate).Aggregate(0, (count, _) => checked(count + 1));
  263. #endif
  264. }
  265. #endregion
  266. #region + ElementAt +
  267. public virtual IObservable<TSource> ElementAt<TSource>(IObservable<TSource> source, int index)
  268. {
  269. #if !NO_PERF
  270. return new ElementAt<TSource>(source, index, true);
  271. #else
  272. return new AnonymousObservable<TSource>(observer =>
  273. {
  274. int i = index;
  275. return source.Subscribe(
  276. x =>
  277. {
  278. if (i == 0)
  279. {
  280. observer.OnNext(x);
  281. observer.OnCompleted();
  282. }
  283. i--;
  284. },
  285. observer.OnError,
  286. () => observer.OnError(new ArgumentOutOfRangeException("index"))
  287. );
  288. });
  289. #endif
  290. }
  291. #endregion
  292. #region + ElementAtOrDefault +
  293. public virtual IObservable<TSource> ElementAtOrDefault<TSource>(IObservable<TSource> source, int index)
  294. {
  295. #if !NO_PERF
  296. return new ElementAt<TSource>(source, index, false);
  297. #else
  298. return new AnonymousObservable<TSource>(observer =>
  299. {
  300. int i = index;
  301. return source.Subscribe(
  302. x =>
  303. {
  304. if (i == 0)
  305. {
  306. observer.OnNext(x);
  307. observer.OnCompleted();
  308. }
  309. i--;
  310. },
  311. observer.OnError,
  312. () =>
  313. {
  314. observer.OnNext(default(TSource));
  315. observer.OnCompleted();
  316. }
  317. );
  318. });
  319. #endif
  320. }
  321. #endregion
  322. #region + FirstAsync +
  323. public virtual IObservable<TSource> FirstAsync<TSource>(IObservable<TSource> source)
  324. {
  325. #if !NO_PERF
  326. return new FirstAsync<TSource>(source, null, true);
  327. #else
  328. return FirstOrDefaultAsync_(source, true);
  329. #endif
  330. }
  331. public virtual IObservable<TSource> FirstAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  332. {
  333. #if !NO_PERF
  334. return new FirstAsync<TSource>(source, predicate, true);
  335. #else
  336. return source.Where(predicate).FirstAsync();
  337. #endif
  338. }
  339. #endregion
  340. #region + FirstAsyncOrDefaultAsync +
  341. public virtual IObservable<TSource> FirstOrDefaultAsync<TSource>(IObservable<TSource> source)
  342. {
  343. #if !NO_PERF
  344. return new FirstAsync<TSource>(source, null, false);
  345. #else
  346. return FirstOrDefaultAsync_(source, false);
  347. #endif
  348. }
  349. public virtual IObservable<TSource> FirstOrDefaultAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  350. {
  351. #if !NO_PERF
  352. return new FirstAsync<TSource>(source, predicate, false);
  353. #else
  354. return source.Where(predicate).FirstOrDefaultAsync();
  355. #endif
  356. }
  357. #if NO_PERF
  358. private static IObservable<TSource> FirstOrDefaultAsync_<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  359. {
  360. return new AnonymousObservable<TSource>(observer =>
  361. {
  362. return source.Subscribe(
  363. x =>
  364. {
  365. observer.OnNext(x);
  366. observer.OnCompleted();
  367. },
  368. observer.OnError,
  369. () =>
  370. {
  371. if (throwOnEmpty)
  372. {
  373. observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  374. }
  375. else
  376. {
  377. observer.OnNext(default(TSource));
  378. observer.OnCompleted();
  379. }
  380. }
  381. );
  382. });
  383. }
  384. #endif
  385. #endregion
  386. #region + IsEmpty +
  387. public virtual IObservable<bool> IsEmpty<TSource>(IObservable<TSource> source)
  388. {
  389. #if !NO_PERF
  390. return new IsEmpty<TSource>(source);
  391. #else
  392. return source.Any().Select(b => !b);
  393. #endif
  394. }
  395. #endregion
  396. #region + LastAsync +
  397. public virtual IObservable<TSource> LastAsync<TSource>(IObservable<TSource> source)
  398. {
  399. #if !NO_PERF
  400. return new LastAsync<TSource>(source, null, true);
  401. #else
  402. return LastOrDefaultAsync_(source, true);
  403. #endif
  404. }
  405. public virtual IObservable<TSource> LastAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  406. {
  407. #if !NO_PERF
  408. return new LastAsync<TSource>(source, predicate, true);
  409. #else
  410. return source.Where(predicate).LastAsync();
  411. #endif
  412. }
  413. #endregion
  414. #region + LastOrDefaultAsync +
  415. public virtual IObservable<TSource> LastOrDefaultAsync<TSource>(IObservable<TSource> source)
  416. {
  417. #if !NO_PERF
  418. return new LastAsync<TSource>(source, null, false);
  419. #else
  420. return LastOrDefaultAsync_(source, false);
  421. #endif
  422. }
  423. public virtual IObservable<TSource> LastOrDefaultAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  424. {
  425. #if !NO_PERF
  426. return new LastAsync<TSource>(source, predicate, false);
  427. #else
  428. return source.Where(predicate).LastOrDefaultAsync();
  429. #endif
  430. }
  431. #if NO_PERF
  432. private static IObservable<TSource> LastOrDefaultAsync_<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  433. {
  434. return new AnonymousObservable<TSource>(observer =>
  435. {
  436. var value = default(TSource);
  437. var seenValue = false;
  438. return source.Subscribe(
  439. x =>
  440. {
  441. value = x;
  442. seenValue = true;
  443. },
  444. observer.OnError,
  445. () =>
  446. {
  447. if (throwOnEmpty && !seenValue)
  448. {
  449. observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  450. }
  451. else
  452. {
  453. observer.OnNext(value);
  454. observer.OnCompleted();
  455. }
  456. }
  457. );
  458. });
  459. }
  460. #endif
  461. #endregion
  462. #region + LongCount +
  463. public virtual IObservable<long> LongCount<TSource>(IObservable<TSource> source)
  464. {
  465. #if !NO_PERF
  466. return new LongCount<TSource>(source);
  467. #else
  468. return source.Aggregate(0L, (count, _) => checked(count + 1));
  469. #endif
  470. }
  471. public virtual IObservable<long> LongCount<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  472. {
  473. #if !NO_PERF
  474. return new LongCount<TSource>(source, predicate);
  475. #else
  476. return source.Where(predicate).Aggregate(0L, (count, _) => checked(count + 1));
  477. #endif
  478. }
  479. #endregion
  480. #region + Max +
  481. public virtual IObservable<TSource> Max<TSource>(IObservable<TSource> source)
  482. {
  483. #if !NO_PERF
  484. // BREAKING CHANGE v2 > v1.x - Behavior for reference types
  485. return new Max<TSource>(source, Comparer<TSource>.Default);
  486. #else
  487. return MaxBy(source, x => x).Select(x => x.First());
  488. #endif
  489. }
  490. public virtual IObservable<TSource> Max<TSource>(IObservable<TSource> source, IComparer<TSource> comparer)
  491. {
  492. #if !NO_PERF
  493. // BREAKING CHANGE v2 > v1.x - Behavior for reference types
  494. return new Max<TSource>(source, comparer);
  495. #else
  496. return MaxBy(source, x => x, comparer).Select(x => x.First());
  497. #endif
  498. }
  499. public virtual IObservable<double> Max(IObservable<double> source)
  500. {
  501. #if !NO_PERF
  502. return new MaxDouble(source);
  503. #else
  504. return source.Scan(double.MinValue, Math.Max).Final();
  505. #endif
  506. }
  507. public virtual IObservable<float> Max(IObservable<float> source)
  508. {
  509. #if !NO_PERF
  510. return new MaxSingle(source);
  511. #else
  512. return source.Scan(float.MinValue, Math.Max).Final();
  513. #endif
  514. }
  515. public virtual IObservable<decimal> Max(IObservable<decimal> source)
  516. {
  517. #if !NO_PERF
  518. return new MaxDecimal(source);
  519. #else
  520. return source.Scan(decimal.MinValue, Math.Max).Final();
  521. #endif
  522. }
  523. public virtual IObservable<int> Max(IObservable<int> source)
  524. {
  525. #if !NO_PERF
  526. return new MaxInt32(source);
  527. #else
  528. return source.Scan(int.MinValue, Math.Max).Final();
  529. #endif
  530. }
  531. public virtual IObservable<long> Max(IObservable<long> source)
  532. {
  533. #if !NO_PERF
  534. return new MaxInt64(source);
  535. #else
  536. return source.Scan(long.MinValue, Math.Max).Final();
  537. #endif
  538. }
  539. public virtual IObservable<double?> Max(IObservable<double?> source)
  540. {
  541. #if !NO_PERF
  542. return new MaxDoubleNullable(source);
  543. #else
  544. return source.Aggregate(new double?(), NullableMax);
  545. #endif
  546. }
  547. public virtual IObservable<float?> Max(IObservable<float?> source)
  548. {
  549. #if !NO_PERF
  550. return new MaxSingleNullable(source);
  551. #else
  552. return source.Aggregate(new float?(), NullableMax);
  553. #endif
  554. }
  555. public virtual IObservable<decimal?> Max(IObservable<decimal?> source)
  556. {
  557. #if !NO_PERF
  558. return new MaxDecimalNullable(source);
  559. #else
  560. return source.Aggregate(new decimal?(), NullableMax);
  561. #endif
  562. }
  563. public virtual IObservable<int?> Max(IObservable<int?> source)
  564. {
  565. #if !NO_PERF
  566. return new MaxInt32Nullable(source);
  567. #else
  568. return source.Aggregate(new int?(), NullableMax);
  569. #endif
  570. }
  571. public virtual IObservable<long?> Max(IObservable<long?> source)
  572. {
  573. #if !NO_PERF
  574. return new MaxInt64Nullable(source);
  575. #else
  576. return source.Aggregate(new long?(), NullableMax);
  577. #endif
  578. }
  579. public virtual IObservable<TResult> Max<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
  580. {
  581. return Max(Select(source, selector));
  582. }
  583. public virtual IObservable<TResult> Max<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector, IComparer<TResult> comparer)
  584. {
  585. return Max(Select(source, selector), comparer);
  586. }
  587. public virtual IObservable<double> Max<TSource>(IObservable<TSource> source, Func<TSource, double> selector)
  588. {
  589. return Max(Select(source, selector));
  590. }
  591. public virtual IObservable<float> Max<TSource>(IObservable<TSource> source, Func<TSource, float> selector)
  592. {
  593. return Max(Select(source, selector));
  594. }
  595. public virtual IObservable<decimal> Max<TSource>(IObservable<TSource> source, Func<TSource, decimal> selector)
  596. {
  597. return Max(Select(source, selector));
  598. }
  599. public virtual IObservable<int> Max<TSource>(IObservable<TSource> source, Func<TSource, int> selector)
  600. {
  601. return Max(Select(source, selector));
  602. }
  603. public virtual IObservable<long> Max<TSource>(IObservable<TSource> source, Func<TSource, long> selector)
  604. {
  605. return Max(Select(source, selector));
  606. }
  607. public virtual IObservable<double?> Max<TSource>(IObservable<TSource> source, Func<TSource, double?> selector)
  608. {
  609. return Max(Select(source, selector));
  610. }
  611. public virtual IObservable<float?> Max<TSource>(IObservable<TSource> source, Func<TSource, float?> selector)
  612. {
  613. return Max(Select(source, selector));
  614. }
  615. public virtual IObservable<decimal?> Max<TSource>(IObservable<TSource> source, Func<TSource, decimal?> selector)
  616. {
  617. return Max(Select(source, selector));
  618. }
  619. public virtual IObservable<int?> Max<TSource>(IObservable<TSource> source, Func<TSource, int?> selector)
  620. {
  621. return Max(Select(source, selector));
  622. }
  623. public virtual IObservable<long?> Max<TSource>(IObservable<TSource> source, Func<TSource, long?> selector)
  624. {
  625. return Max(Select(source, selector));
  626. }
  627. #endregion
  628. #region + MaxBy +
  629. public virtual IObservable<IList<TSource>> MaxBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  630. {
  631. #if !NO_PERF
  632. return new MaxBy<TSource, TKey>(source, keySelector, Comparer<TKey>.Default);
  633. #else
  634. return MaxBy(source, keySelector, Comparer<TKey>.Default);
  635. #endif
  636. }
  637. public virtual IObservable<IList<TSource>> MaxBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  638. {
  639. #if !NO_PERF
  640. return new MaxBy<TSource, TKey>(source, keySelector, comparer);
  641. #else
  642. return ExtremaBy(source, keySelector, comparer);
  643. #endif
  644. }
  645. #endregion
  646. #region + Min +
  647. public virtual IObservable<TSource> Min<TSource>(IObservable<TSource> source)
  648. {
  649. #if !NO_PERF
  650. // BREAKING CHANGE v2 > v1.x - Behavior for reference types
  651. return new Min<TSource>(source, Comparer<TSource>.Default);
  652. #else
  653. return MinBy(source, x => x).Select(x => x.First());
  654. #endif
  655. }
  656. public virtual IObservable<TSource> Min<TSource>(IObservable<TSource> source, IComparer<TSource> comparer)
  657. {
  658. #if !NO_PERF
  659. // BREAKING CHANGE v2 > v1.x - Behavior for reference types
  660. return new Min<TSource>(source, comparer);
  661. #else
  662. return MinBy(source, x => x, comparer).Select(x => x.First());
  663. #endif
  664. }
  665. public virtual IObservable<double> Min(IObservable<double> source)
  666. {
  667. #if !NO_PERF
  668. return new MinDouble(source);
  669. #else
  670. return source.Scan(double.MaxValue, Math.Min).Final();
  671. #endif
  672. }
  673. public virtual IObservable<float> Min(IObservable<float> source)
  674. {
  675. #if !NO_PERF
  676. return new MinSingle(source);
  677. #else
  678. return source.Scan(float.MaxValue, Math.Min).Final();
  679. #endif
  680. }
  681. public virtual IObservable<decimal> Min(IObservable<decimal> source)
  682. {
  683. #if !NO_PERF
  684. return new MinDecimal(source);
  685. #else
  686. return source.Scan(decimal.MaxValue, Math.Min).Final();
  687. #endif
  688. }
  689. public virtual IObservable<int> Min(IObservable<int> source)
  690. {
  691. #if !NO_PERF
  692. return new MinInt32(source);
  693. #else
  694. return source.Scan(int.MaxValue, Math.Min).Final();
  695. #endif
  696. }
  697. public virtual IObservable<long> Min(IObservable<long> source)
  698. {
  699. #if !NO_PERF
  700. return new MinInt64(source);
  701. #else
  702. return source.Scan(long.MaxValue, Math.Min).Final();
  703. #endif
  704. }
  705. public virtual IObservable<double?> Min(IObservable<double?> source)
  706. {
  707. #if !NO_PERF
  708. return new MinDoubleNullable(source);
  709. #else
  710. return source.Aggregate(new double?(), NullableMin);
  711. #endif
  712. }
  713. public virtual IObservable<float?> Min(IObservable<float?> source)
  714. {
  715. #if !NO_PERF
  716. return new MinSingleNullable(source);
  717. #else
  718. return source.Aggregate(new float?(), NullableMin);
  719. #endif
  720. }
  721. public virtual IObservable<decimal?> Min(IObservable<decimal?> source)
  722. {
  723. #if !NO_PERF
  724. return new MinDecimalNullable(source);
  725. #else
  726. return source.Aggregate(new decimal?(), NullableMin);
  727. #endif
  728. }
  729. public virtual IObservable<int?> Min(IObservable<int?> source)
  730. {
  731. #if !NO_PERF
  732. return new MinInt32Nullable(source);
  733. #else
  734. return source.Aggregate(new int?(), NullableMin);
  735. #endif
  736. }
  737. public virtual IObservable<long?> Min(IObservable<long?> source)
  738. {
  739. #if !NO_PERF
  740. return new MinInt64Nullable(source);
  741. #else
  742. return source.Aggregate(new long?(), NullableMin);
  743. #endif
  744. }
  745. public virtual IObservable<TResult> Min<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
  746. {
  747. return Min(Select(source, selector));
  748. }
  749. public virtual IObservable<TResult> Min<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector, IComparer<TResult> comparer)
  750. {
  751. return Min(Select(source, selector), comparer);
  752. }
  753. public virtual IObservable<double> Min<TSource>(IObservable<TSource> source, Func<TSource, double> selector)
  754. {
  755. return Min(Select(source, selector));
  756. }
  757. public virtual IObservable<float> Min<TSource>(IObservable<TSource> source, Func<TSource, float> selector)
  758. {
  759. return Min(Select(source, selector));
  760. }
  761. public virtual IObservable<decimal> Min<TSource>(IObservable<TSource> source, Func<TSource, decimal> selector)
  762. {
  763. return Min(Select(source, selector));
  764. }
  765. public virtual IObservable<int> Min<TSource>(IObservable<TSource> source, Func<TSource, int> selector)
  766. {
  767. return Min(Select(source, selector));
  768. }
  769. public virtual IObservable<long> Min<TSource>(IObservable<TSource> source, Func<TSource, long> selector)
  770. {
  771. return Min(Select(source, selector));
  772. }
  773. public virtual IObservable<double?> Min<TSource>(IObservable<TSource> source, Func<TSource, double?> selector)
  774. {
  775. return Min(Select(source, selector));
  776. }
  777. public virtual IObservable<float?> Min<TSource>(IObservable<TSource> source, Func<TSource, float?> selector)
  778. {
  779. return Min(Select(source, selector));
  780. }
  781. public virtual IObservable<decimal?> Min<TSource>(IObservable<TSource> source, Func<TSource, decimal?> selector)
  782. {
  783. return Min(Select(source, selector));
  784. }
  785. public virtual IObservable<int?> Min<TSource>(IObservable<TSource> source, Func<TSource, int?> selector)
  786. {
  787. return Min(Select(source, selector));
  788. }
  789. public virtual IObservable<long?> Min<TSource>(IObservable<TSource> source, Func<TSource, long?> selector)
  790. {
  791. return Min(Select(source, selector));
  792. }
  793. #endregion
  794. #region + MinBy +
  795. public virtual IObservable<IList<TSource>> MinBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  796. {
  797. #if !NO_PERF
  798. return new MinBy<TSource, TKey>(source, keySelector, Comparer<TKey>.Default);
  799. #else
  800. return MinBy(source, keySelector, Comparer<TKey>.Default);
  801. #endif
  802. }
  803. public virtual IObservable<IList<TSource>> MinBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  804. {
  805. #if !NO_PERF
  806. return new MinBy<TSource, TKey>(source, keySelector, comparer);
  807. #else
  808. return ExtremaBy(source, keySelector, new AnonymousComparer<TKey>((x, y) => comparer.Compare(x, y) * -1));
  809. #endif
  810. }
  811. #endregion
  812. #region + SequenceEqual +
  813. public virtual IObservable<bool> SequenceEqual<TSource>(IObservable<TSource> first, IObservable<TSource> second)
  814. {
  815. #if !NO_PERF
  816. return new SequenceEqual<TSource>(first, second, EqualityComparer<TSource>.Default);
  817. #else
  818. return first.SequenceEqual(second, EqualityComparer<TSource>.Default);
  819. #endif
  820. }
  821. public virtual IObservable<bool> SequenceEqual<TSource>(IObservable<TSource> first, IObservable<TSource> second, IEqualityComparer<TSource> comparer)
  822. {
  823. #if !NO_PERF
  824. return new SequenceEqual<TSource>(first, second, comparer);
  825. #else
  826. return new AnonymousObservable<bool>(observer =>
  827. {
  828. var gate = new object();
  829. var donel = false;
  830. var doner = false;
  831. var ql = new Queue<TSource>();
  832. var qr = new Queue<TSource>();
  833. var subscription1 = first.Subscribe(
  834. x =>
  835. {
  836. lock (gate)
  837. {
  838. if (qr.Count > 0)
  839. {
  840. var equal = false;
  841. var v = qr.Dequeue();
  842. try
  843. {
  844. equal = comparer.Equals(x, v);
  845. }
  846. catch (Exception exception)
  847. {
  848. observer.OnError(exception);
  849. return;
  850. }
  851. if (!equal)
  852. {
  853. observer.OnNext(false);
  854. observer.OnCompleted();
  855. }
  856. }
  857. else if (doner)
  858. {
  859. observer.OnNext(false);
  860. observer.OnCompleted();
  861. }
  862. else
  863. ql.Enqueue(x);
  864. }
  865. },
  866. observer.OnError,
  867. () =>
  868. {
  869. lock (gate)
  870. {
  871. donel = true;
  872. if (ql.Count == 0)
  873. {
  874. if (qr.Count > 0)
  875. {
  876. observer.OnNext(false);
  877. observer.OnCompleted();
  878. }
  879. else if (doner)
  880. {
  881. observer.OnNext(true);
  882. observer.OnCompleted();
  883. }
  884. }
  885. }
  886. });
  887. var subscription2 = second.Subscribe(
  888. x =>
  889. {
  890. lock (gate)
  891. {
  892. if (ql.Count > 0)
  893. {
  894. var equal = false;
  895. var v = ql.Dequeue();
  896. try
  897. {
  898. equal = comparer.Equals(v, x);
  899. }
  900. catch (Exception exception)
  901. {
  902. observer.OnError(exception);
  903. return;
  904. }
  905. if (!equal)
  906. {
  907. observer.OnNext(false);
  908. observer.OnCompleted();
  909. }
  910. }
  911. else if (donel)
  912. {
  913. observer.OnNext(false);
  914. observer.OnCompleted();
  915. }
  916. else
  917. qr.Enqueue(x);
  918. }
  919. },
  920. observer.OnError,
  921. () =>
  922. {
  923. lock (gate)
  924. {
  925. doner = true;
  926. if (qr.Count == 0)
  927. {
  928. if (ql.Count > 0)
  929. {
  930. observer.OnNext(false);
  931. observer.OnCompleted();
  932. }
  933. else if (donel)
  934. {
  935. observer.OnNext(true);
  936. observer.OnCompleted();
  937. }
  938. }
  939. }
  940. });
  941. return new CompositeDisposable(subscription1, subscription2);
  942. });
  943. #endif
  944. }
  945. public virtual IObservable<bool> SequenceEqual<TSource>(IObservable<TSource> first, IEnumerable<TSource> second)
  946. {
  947. #if !NO_PERF
  948. return new SequenceEqual<TSource>(first, second, EqualityComparer<TSource>.Default);
  949. #else
  950. return SequenceEqual<TSource>(first, second, EqualityComparer<TSource>.Default);
  951. #endif
  952. }
  953. public virtual IObservable<bool> SequenceEqual<TSource>(IObservable<TSource> first, IEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
  954. {
  955. #if !NO_PERF
  956. return new SequenceEqual<TSource>(first, second, comparer);
  957. #else
  958. return new AnonymousObservable<bool>(observer =>
  959. {
  960. var e = default(IEnumerator<TSource>);
  961. try
  962. {
  963. e = second.GetEnumerator();
  964. }
  965. catch (Exception ex)
  966. {
  967. observer.OnError(ex);
  968. return Disposable.Empty;
  969. }
  970. return new CompositeDisposable(
  971. first.Subscribe(
  972. value =>
  973. {
  974. var equal = false;
  975. try
  976. {
  977. var hasNext = e.MoveNext();
  978. if (hasNext)
  979. {
  980. var current = e.Current;
  981. equal = comparer.Equals(value, current);
  982. }
  983. }
  984. catch (Exception ex)
  985. {
  986. observer.OnError(ex);
  987. return;
  988. }
  989. if (!equal)
  990. {
  991. observer.OnNext(false);
  992. observer.OnCompleted();
  993. }
  994. },
  995. observer.OnError,
  996. () =>
  997. {
  998. var hasNext = false;
  999. try
  1000. {
  1001. hasNext = e.MoveNext();
  1002. }
  1003. catch (Exception exception)
  1004. {
  1005. observer.OnError(exception);
  1006. return;
  1007. }
  1008. observer.OnNext(!hasNext);
  1009. observer.OnCompleted();
  1010. }
  1011. ),
  1012. e
  1013. );
  1014. });
  1015. #endif
  1016. }
  1017. #endregion
  1018. #region + SingleAsync +
  1019. public virtual IObservable<TSource> SingleAsync<TSource>(IObservable<TSource> source)
  1020. {
  1021. #if !NO_PERF
  1022. return new SingleAsync<TSource>(source, null, true);
  1023. #else
  1024. return SingleOrDefaultAsync_(source, true);
  1025. #endif
  1026. }
  1027. public virtual IObservable<TSource> SingleAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  1028. {
  1029. #if !NO_PERF
  1030. return new SingleAsync<TSource>(source, predicate, true);
  1031. #else
  1032. return source.Where(predicate).SingleAsync();
  1033. #endif
  1034. }
  1035. #endregion
  1036. #region + SingleOrDefaultAsync +
  1037. public virtual IObservable<TSource> SingleOrDefaultAsync<TSource>(IObservable<TSource> source)
  1038. {
  1039. #if !NO_PERF
  1040. return new SingleAsync<TSource>(source, null, false);
  1041. #else
  1042. return SingleOrDefaultAsync_(source, false);
  1043. #endif
  1044. }
  1045. public virtual IObservable<TSource> SingleOrDefaultAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  1046. {
  1047. #if !NO_PERF
  1048. return new SingleAsync<TSource>(source, predicate, false);
  1049. #else
  1050. return source.Where(predicate).SingleOrDefaultAsync();
  1051. #endif
  1052. }
  1053. #if NO_PERF
  1054. private static IObservable<TSource> SingleOrDefaultAsync_<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  1055. {
  1056. return new AnonymousObservable<TSource>(observer =>
  1057. {
  1058. var value = default(TSource);
  1059. var seenValue = false;
  1060. return source.Subscribe(
  1061. x =>
  1062. {
  1063. if (seenValue)
  1064. {
  1065. observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT));
  1066. }
  1067. else
  1068. {
  1069. value = x;
  1070. seenValue = true;
  1071. }
  1072. },
  1073. observer.OnError,
  1074. () =>
  1075. {
  1076. if (throwOnEmpty && !seenValue)
  1077. {
  1078. observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  1079. }
  1080. else
  1081. {
  1082. observer.OnNext(value);
  1083. observer.OnCompleted();
  1084. }
  1085. }
  1086. );
  1087. });
  1088. }
  1089. #endif
  1090. #endregion
  1091. #region + Sum +
  1092. public virtual IObservable<double> Sum(IObservable<double> source)
  1093. {
  1094. #if !NO_PERF
  1095. return new SumDouble(source);
  1096. #else
  1097. return source.Aggregate(0.0, (prev, curr) => prev + curr);
  1098. #endif
  1099. }
  1100. public virtual IObservable<float> Sum(IObservable<float> source)
  1101. {
  1102. #if !NO_PERF
  1103. return new SumSingle(source);
  1104. #else
  1105. return source.Aggregate(0f, (prev, curr) => prev + curr);
  1106. #endif
  1107. }
  1108. public virtual IObservable<decimal> Sum(IObservable<decimal> source)
  1109. {
  1110. #if !NO_PERF
  1111. return new SumDecimal(source);
  1112. #else
  1113. return source.Aggregate(0M, (prev, curr) => prev + curr);
  1114. #endif
  1115. }
  1116. public virtual IObservable<int> Sum(IObservable<int> source)
  1117. {
  1118. #if !NO_PERF
  1119. return new SumInt32(source);
  1120. #else
  1121. return source.Aggregate(0, (prev, curr) => checked(prev + curr));
  1122. #endif
  1123. }
  1124. public virtual IObservable<long> Sum(IObservable<long> source)
  1125. {
  1126. #if !NO_PERF
  1127. return new SumInt64(source);
  1128. #else
  1129. return source.Aggregate(0L, (prev, curr) => checked(prev + curr));
  1130. #endif
  1131. }
  1132. public virtual IObservable<double?> Sum(IObservable<double?> source)
  1133. {
  1134. #if !NO_PERF
  1135. return new SumDoubleNullable(source);
  1136. #else
  1137. return source.Aggregate(0.0, (prev, curr) => prev + curr.GetValueOrDefault()).Select(x => (double?)x);
  1138. #endif
  1139. }
  1140. public virtual IObservable<float?> Sum(IObservable<float?> source)
  1141. {
  1142. #if !NO_PERF
  1143. return new SumSingleNullable(source);
  1144. #else
  1145. return source.Aggregate(0f, (prev, curr) => prev + curr.GetValueOrDefault()).Select(x => (float?)x);
  1146. #endif
  1147. }
  1148. public virtual IObservable<decimal?> Sum(IObservable<decimal?> source)
  1149. {
  1150. #if !NO_PERF
  1151. return new SumDecimalNullable(source);
  1152. #else
  1153. return source.Aggregate(0M, (prev, curr) => prev + curr.GetValueOrDefault()).Select(x => (decimal?)x);
  1154. #endif
  1155. }
  1156. public virtual IObservable<int?> Sum(IObservable<int?> source)
  1157. {
  1158. #if !NO_PERF
  1159. return new SumInt32Nullable(source);
  1160. #else
  1161. return source.Aggregate(0, (prev, curr) => checked(prev + curr.GetValueOrDefault())).Select(x => (int?)x);
  1162. #endif
  1163. }
  1164. public virtual IObservable<long?> Sum(IObservable<long?> source)
  1165. {
  1166. #if !NO_PERF
  1167. return new SumInt64Nullable(source);
  1168. #else
  1169. return source.Aggregate(0L, (prev, curr) => checked(prev + curr.GetValueOrDefault())).Select(x => (long?)x);
  1170. #endif
  1171. }
  1172. public virtual IObservable<double> Sum<TSource>(IObservable<TSource> source, Func<TSource, double> selector)
  1173. {
  1174. return Sum(Select(source, selector));
  1175. }
  1176. public virtual IObservable<float> Sum<TSource>(IObservable<TSource> source, Func<TSource, float> selector)
  1177. {
  1178. return Sum(Select(source, selector));
  1179. }
  1180. public virtual IObservable<decimal> Sum<TSource>(IObservable<TSource> source, Func<TSource, decimal> selector)
  1181. {
  1182. return Sum(Select(source, selector));
  1183. }
  1184. public virtual IObservable<int> Sum<TSource>(IObservable<TSource> source, Func<TSource, int> selector)
  1185. {
  1186. return Sum(Select(source, selector));
  1187. }
  1188. public virtual IObservable<long> Sum<TSource>(IObservable<TSource> source, Func<TSource, long> selector)
  1189. {
  1190. return Sum(Select(source, selector));
  1191. }
  1192. public virtual IObservable<double?> Sum<TSource>(IObservable<TSource> source, Func<TSource, double?> selector)
  1193. {
  1194. return Sum(Select(source, selector));
  1195. }
  1196. public virtual IObservable<float?> Sum<TSource>(IObservable<TSource> source, Func<TSource, float?> selector)
  1197. {
  1198. return Sum(Select(source, selector));
  1199. }
  1200. public virtual IObservable<decimal?> Sum<TSource>(IObservable<TSource> source, Func<TSource, decimal?> selector)
  1201. {
  1202. return Sum(Select(source, selector));
  1203. }
  1204. public virtual IObservable<int?> Sum<TSource>(IObservable<TSource> source, Func<TSource, int?> selector)
  1205. {
  1206. return Sum(Select(source, selector));
  1207. }
  1208. public virtual IObservable<long?> Sum<TSource>(IObservable<TSource> source, Func<TSource, long?> selector)
  1209. {
  1210. return Sum(Select(source, selector));
  1211. }
  1212. #endregion
  1213. #region + ToArray +
  1214. public virtual IObservable<TSource[]> ToArray<TSource>(IObservable<TSource> source)
  1215. {
  1216. #if !NO_PERF
  1217. return new ToArray<TSource>(source);
  1218. #else
  1219. return source.ToList().Select(xs => xs.ToArray());
  1220. #endif
  1221. }
  1222. #endregion
  1223. #region + ToDictionary +
  1224. public virtual IObservable<IDictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  1225. {
  1226. #if !NO_PERF
  1227. return new ToDictionary<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
  1228. #else
  1229. return source.Aggregate((IDictionary<TKey, TElement>)new Dictionary<TKey, TElement>(comparer), (dict, x) =>
  1230. {
  1231. dict.Add(keySelector(x), elementSelector(x));
  1232. return dict;
  1233. });
  1234. #endif
  1235. }
  1236. public virtual IObservable<IDictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  1237. {
  1238. #if !NO_PERF
  1239. return new ToDictionary<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1240. #else
  1241. return source.ToDictionary(keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1242. #endif
  1243. }
  1244. public virtual IObservable<IDictionary<TKey, TSource>> ToDictionary<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  1245. {
  1246. #if !NO_PERF
  1247. return new ToDictionary<TSource, TKey, TSource>(source, keySelector, x => x, comparer);
  1248. #else
  1249. return source.ToDictionary(keySelector, x => x, comparer);
  1250. #endif
  1251. }
  1252. public virtual IObservable<IDictionary<TKey, TSource>> ToDictionary<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  1253. {
  1254. #if !NO_PERF
  1255. return new ToDictionary<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default);
  1256. #else
  1257. return source.ToDictionary(keySelector, x => x, EqualityComparer<TKey>.Default);
  1258. #endif
  1259. }
  1260. #endregion
  1261. #region + ToList +
  1262. public virtual IObservable<IList<TSource>> ToList<TSource>(IObservable<TSource> source)
  1263. {
  1264. #if !NO_PERF
  1265. return new ToList<TSource>(source);
  1266. #else
  1267. return source.Aggregate((IList<TSource>)new List<TSource>(), (list, x) =>
  1268. {
  1269. list.Add(x);
  1270. return list;
  1271. });
  1272. #endif
  1273. }
  1274. #endregion
  1275. #region + ToLookup +
  1276. public virtual IObservable<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  1277. {
  1278. #if !NO_PERF
  1279. return new ToLookup<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
  1280. #else
  1281. return source.Aggregate(new Lookup<TKey, TElement>(comparer), (lookup, x) =>
  1282. {
  1283. lookup.Add(keySelector(x), elementSelector(x));
  1284. return lookup;
  1285. }).Select(xs => (ILookup<TKey, TElement>)xs);
  1286. #endif
  1287. }
  1288. public virtual IObservable<ILookup<TKey, TSource>> ToLookup<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  1289. {
  1290. #if !NO_PERF
  1291. return new ToLookup<TSource, TKey, TSource>(source, keySelector, x => x, comparer);
  1292. #else
  1293. return source.ToLookup(keySelector, x => x, comparer);
  1294. #endif
  1295. }
  1296. public virtual IObservable<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  1297. {
  1298. #if !NO_PERF
  1299. return new ToLookup<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1300. #else
  1301. return source.ToLookup(keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1302. #endif
  1303. }
  1304. public virtual IObservable<ILookup<TKey, TSource>> ToLookup<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  1305. {
  1306. #if !NO_PERF
  1307. return new ToLookup<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default);
  1308. #else
  1309. return source.ToLookup(keySelector, x => x, EqualityComparer<TKey>.Default);
  1310. #endif
  1311. }
  1312. #endregion
  1313. #region |> Helpers <|
  1314. #if NO_PERF
  1315. private static T? NullableMin<T>(T? x, T? y)
  1316. where T : struct, IComparable<T>
  1317. {
  1318. if (!x.HasValue)
  1319. return y;
  1320. if (!y.HasValue)
  1321. return x;
  1322. if (x.Value.CompareTo(y.Value) <= 0)
  1323. return x;
  1324. return y;
  1325. }
  1326. private static T? NullableMax<T>(T? x, T? y)
  1327. where T : struct, IComparable<T>
  1328. {
  1329. if (!x.HasValue)
  1330. return y;
  1331. if (!y.HasValue)
  1332. return x;
  1333. if (x.Value.CompareTo(y.Value) >= 0)
  1334. return x;
  1335. return y;
  1336. }
  1337. private static IObservable<IList<TSource>> ExtremaBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  1338. {
  1339. return new AnonymousObservable<IList<TSource>>(observer =>
  1340. {
  1341. var hasValue = false;
  1342. var lastKey = default(TKey);
  1343. var list = new List<TSource>();
  1344. return source.Subscribe(
  1345. x =>
  1346. {
  1347. var key = default(TKey);
  1348. try
  1349. {
  1350. key = keySelector(x);
  1351. }
  1352. catch (Exception ex)
  1353. {
  1354. observer.OnError(ex);
  1355. return;
  1356. }
  1357. var comparison = 0;
  1358. if (!hasValue)
  1359. {
  1360. hasValue = true;
  1361. lastKey = key;
  1362. }
  1363. else
  1364. {
  1365. try
  1366. {
  1367. comparison = comparer.Compare(key, lastKey);
  1368. }
  1369. catch (Exception ex)
  1370. {
  1371. observer.OnError(ex);
  1372. return;
  1373. }
  1374. }
  1375. if (comparison > 0)
  1376. {
  1377. lastKey = key;
  1378. list.Clear();
  1379. }
  1380. if (comparison >= 0)
  1381. {
  1382. list.Add(x);
  1383. }
  1384. },
  1385. observer.OnError,
  1386. () =>
  1387. {
  1388. observer.OnNext(list);
  1389. observer.OnCompleted();
  1390. }
  1391. );
  1392. });
  1393. }
  1394. #endif
  1395. #endregion
  1396. }
  1397. #region |> Helper types <|
  1398. #if NO_PERF
  1399. static class AggregateExtensions
  1400. {
  1401. public static IObservable<TSource> Final<TSource>(this IObservable<TSource> source)
  1402. {
  1403. return new AnonymousObservable<TSource>(observer =>
  1404. {
  1405. var value = default(TSource);
  1406. var hasValue = false;
  1407. return source.Subscribe(
  1408. x =>
  1409. {
  1410. hasValue = true;
  1411. value = x;
  1412. },
  1413. observer.OnError,
  1414. () =>
  1415. {
  1416. if (!hasValue)
  1417. observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  1418. else
  1419. {
  1420. observer.OnNext(value);
  1421. observer.OnCompleted();
  1422. }
  1423. });
  1424. });
  1425. }
  1426. }
  1427. sealed class AnonymousComparer<T> : IComparer<T>
  1428. {
  1429. private readonly Func<T, T, int> comparer;
  1430. /// <summary>
  1431. /// Creates an instance of IComparer by providing a method that compares two objects.
  1432. /// </summary>
  1433. public AnonymousComparer(Func<T, T, int> comparer)
  1434. {
  1435. this.comparer = comparer;
  1436. }
  1437. /// <summary>
  1438. /// Compares two objects and returns a value indicating whether one is less than, equal to, or greater than the other.
  1439. /// </summary>
  1440. public int Compare(T x, T y)
  1441. {
  1442. return comparer(x, y);
  1443. }
  1444. }
  1445. #endif
  1446. #endregion
  1447. }