AsyncEnumerable.Aggregates.cs 74 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading.Tasks;
  6. using System.Threading;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static Task<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException("source");
  15. if (accumulator == null)
  16. throw new ArgumentNullException("accumulator");
  17. if (resultSelector == null)
  18. throw new ArgumentNullException("resultSelector");
  19. var tcs = new TaskCompletionSource<TResult>();
  20. var acc = seed;
  21. var e = source.GetEnumerator();
  22. var f = default(Action<CancellationToken>);
  23. f = ct => e.MoveNext(ct).ContinueWith(t =>
  24. {
  25. t.Handle(tcs, res =>
  26. {
  27. if (res)
  28. {
  29. try
  30. {
  31. acc = accumulator(acc, e.Current);
  32. f(ct);
  33. }
  34. catch (Exception exception)
  35. {
  36. tcs.TrySetException(exception);
  37. }
  38. }
  39. else
  40. {
  41. var result = default(TResult);
  42. try
  43. {
  44. result = resultSelector(acc);
  45. }
  46. catch (Exception exception)
  47. {
  48. tcs.TrySetException(exception);
  49. return;
  50. }
  51. tcs.TrySetResult(result);
  52. }
  53. });
  54. });
  55. f(cancellationToken);
  56. return tcs.Task.Finally(e.Dispose);
  57. }
  58. public static Task<TAccumulate> Aggregate<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken)
  59. {
  60. if (source == null)
  61. throw new ArgumentNullException("source");
  62. if (accumulator == null)
  63. throw new ArgumentNullException("accumulator");
  64. return source.Aggregate(seed, accumulator, x => x, cancellationToken);
  65. }
  66. public static Task<TSource> Aggregate<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  67. {
  68. if (source == null)
  69. throw new ArgumentNullException("source");
  70. if (accumulator == null)
  71. throw new ArgumentNullException("accumulator");
  72. var tcs = new TaskCompletionSource<TSource>();
  73. var first = true;
  74. var acc = default(TSource);
  75. var e = source.GetEnumerator();
  76. var f = default(Action<CancellationToken>);
  77. f = ct => e.MoveNext(ct).ContinueWith(t =>
  78. {
  79. t.Handle(tcs, res =>
  80. {
  81. if (res)
  82. {
  83. try
  84. {
  85. if (first)
  86. acc = e.Current;
  87. else
  88. acc = accumulator(acc, e.Current);
  89. f(ct);
  90. }
  91. catch (Exception ex)
  92. {
  93. tcs.TrySetException(ex);
  94. }
  95. first = false;
  96. }
  97. else
  98. {
  99. if (first)
  100. tcs.TrySetException(new InvalidOperationException());
  101. else
  102. tcs.TrySetResult(acc);
  103. }
  104. });
  105. });
  106. f(cancellationToken);
  107. return tcs.Task.Finally(e.Dispose);
  108. }
  109. public static Task<int> Count<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  110. {
  111. if (source == null)
  112. throw new ArgumentNullException("source");
  113. return source.Aggregate(0, (c, _) => c + 1, cancellationToken);
  114. }
  115. public static Task<int> Count<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  116. {
  117. if (source == null)
  118. throw new ArgumentNullException("source");
  119. if (predicate == null)
  120. throw new ArgumentNullException("predicate");
  121. return source.Where(predicate).Aggregate(0, (c, _) => c + 1, cancellationToken);
  122. }
  123. public static Task<long> LongCount<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  124. {
  125. if (source == null)
  126. throw new ArgumentNullException("source");
  127. return source.Aggregate(0L, (c, _) => c + 1, cancellationToken);
  128. }
  129. public static Task<long> LongCount<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  130. {
  131. if (source == null)
  132. throw new ArgumentNullException("source");
  133. if (predicate == null)
  134. throw new ArgumentNullException("predicate");
  135. return source.Where(predicate).Aggregate(0L, (c, _) => c + 1, cancellationToken);
  136. }
  137. public static Task<bool> All<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  138. {
  139. if (source == null)
  140. throw new ArgumentNullException("source");
  141. if (predicate == null)
  142. throw new ArgumentNullException("predicate");
  143. var tcs = new TaskCompletionSource<bool>();
  144. var e = source.GetEnumerator();
  145. var f = default(Action<CancellationToken>);
  146. f = ct => e.MoveNext(ct).ContinueWith(t =>
  147. {
  148. t.Handle(tcs, res =>
  149. {
  150. if (res)
  151. {
  152. try
  153. {
  154. if (!predicate(e.Current))
  155. tcs.TrySetResult(false);
  156. else
  157. f(ct);
  158. }
  159. catch (Exception ex)
  160. {
  161. tcs.TrySetException(ex);
  162. }
  163. }
  164. else
  165. {
  166. tcs.TrySetResult(true);
  167. }
  168. });
  169. });
  170. f(cancellationToken);
  171. return tcs.Task.Finally(e.Dispose);
  172. }
  173. public static Task<bool> Any<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  174. {
  175. if (source == null)
  176. throw new ArgumentNullException("source");
  177. if (predicate == null)
  178. throw new ArgumentNullException("predicate");
  179. var tcs = new TaskCompletionSource<bool>();
  180. var e = source.GetEnumerator();
  181. var f = default(Action<CancellationToken>);
  182. f = ct => e.MoveNext(ct).ContinueWith(t =>
  183. {
  184. t.Handle(tcs, res =>
  185. {
  186. if (res)
  187. {
  188. try
  189. {
  190. if (predicate(e.Current))
  191. tcs.TrySetResult(true);
  192. else
  193. f(ct);
  194. }
  195. catch (Exception ex)
  196. {
  197. tcs.TrySetException(ex);
  198. }
  199. }
  200. else
  201. {
  202. tcs.TrySetResult(false);
  203. }
  204. });
  205. });
  206. f(cancellationToken);
  207. return tcs.Task.Finally(e.Dispose);
  208. }
  209. public static Task<bool> Any<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  210. {
  211. if (source == null)
  212. throw new ArgumentNullException("source");
  213. var e = source.GetEnumerator();
  214. return e.MoveNext(cancellationToken);
  215. }
  216. public static Task<bool> Contains<TSource>(this IAsyncEnumerable<TSource> source, TSource value, IEqualityComparer<TSource> comparer, CancellationToken cancellationToken)
  217. {
  218. if (source == null)
  219. throw new ArgumentNullException("source");
  220. if (comparer == null)
  221. throw new ArgumentNullException("comparer");
  222. return source.Any(x => comparer.Equals(x, value), cancellationToken);
  223. }
  224. public static Task<bool> Contains<TSource>(this IAsyncEnumerable<TSource> source, TSource value, CancellationToken cancellationToken)
  225. {
  226. if (source == null)
  227. throw new ArgumentNullException("source");
  228. return source.Contains(value, EqualityComparer<TSource>.Default, cancellationToken);
  229. }
  230. public static Task<TSource> First<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  231. {
  232. if (source == null)
  233. throw new ArgumentNullException("source");
  234. var tcs = new TaskCompletionSource<TSource>();
  235. var e = source.GetEnumerator();
  236. e.MoveNext(cancellationToken).ContinueWith(t =>
  237. {
  238. t.Handle(tcs, res =>
  239. {
  240. if (res)
  241. tcs.TrySetResult(e.Current);
  242. else
  243. tcs.TrySetException(new InvalidOperationException());
  244. });
  245. });
  246. return tcs.Task.Finally(e.Dispose);
  247. }
  248. public static Task<TSource> First<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  249. {
  250. if (source == null)
  251. throw new ArgumentNullException("source");
  252. if (predicate == null)
  253. throw new ArgumentNullException("predicate");
  254. return source.Where(predicate).First(cancellationToken);
  255. }
  256. public static Task<TSource> FirstOrDefault<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  257. {
  258. if (source == null)
  259. throw new ArgumentNullException("source");
  260. var tcs = new TaskCompletionSource<TSource>();
  261. var e = source.GetEnumerator();
  262. e.MoveNext(cancellationToken).ContinueWith(t =>
  263. {
  264. t.Handle(tcs, res =>
  265. {
  266. if (res)
  267. tcs.TrySetResult(e.Current);
  268. else
  269. tcs.TrySetResult(default(TSource));
  270. });
  271. });
  272. return tcs.Task.Finally(e.Dispose);
  273. }
  274. public static Task<TSource> FirstOrDefault<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  275. {
  276. if (source == null)
  277. throw new ArgumentNullException("source");
  278. if (predicate == null)
  279. throw new ArgumentNullException("predicate");
  280. return source.Where(predicate).FirstOrDefault(cancellationToken);
  281. }
  282. public static Task<TSource> Last<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  283. {
  284. if (source == null)
  285. throw new ArgumentNullException("source");
  286. var tcs = new TaskCompletionSource<TSource>();
  287. var e = source.GetEnumerator();
  288. var last = default(TSource);
  289. var hasLast = false;
  290. var f = default(Action<CancellationToken>);
  291. f = ct => e.MoveNext(ct).ContinueWith(t =>
  292. {
  293. t.Handle(tcs, res =>
  294. {
  295. if (res)
  296. {
  297. hasLast = true;
  298. last = e.Current;
  299. f(ct);
  300. }
  301. else
  302. {
  303. if (!hasLast)
  304. tcs.TrySetException(new InvalidOperationException());
  305. else
  306. tcs.TrySetResult(last);
  307. }
  308. });
  309. });
  310. f(cancellationToken);
  311. return tcs.Task.Finally(e.Dispose);
  312. }
  313. public static Task<TSource> Last<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  314. {
  315. if (source == null)
  316. throw new ArgumentNullException("source");
  317. if (predicate == null)
  318. throw new ArgumentNullException("predicate");
  319. return source.Where(predicate).Last(cancellationToken);
  320. }
  321. public static Task<TSource> LastOrDefault<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  322. {
  323. if (source == null)
  324. throw new ArgumentNullException("source");
  325. var tcs = new TaskCompletionSource<TSource>();
  326. var e = source.GetEnumerator();
  327. var last = default(TSource);
  328. var hasLast = false;
  329. var f = default(Action<CancellationToken>);
  330. f = ct => e.MoveNext(ct).ContinueWith(t =>
  331. {
  332. t.Handle(tcs, res =>
  333. {
  334. if (res)
  335. {
  336. hasLast = true;
  337. last = e.Current;
  338. f(ct);
  339. }
  340. else
  341. {
  342. if (!hasLast)
  343. tcs.TrySetResult(default(TSource));
  344. else
  345. tcs.TrySetResult(last);
  346. }
  347. });
  348. });
  349. f(cancellationToken);
  350. return tcs.Task.Finally(e.Dispose);
  351. }
  352. public static Task<TSource> LastOrDefault<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  353. {
  354. if (source == null)
  355. throw new ArgumentNullException("source");
  356. if (predicate == null)
  357. throw new ArgumentNullException("predicate");
  358. return source.Where(predicate).LastOrDefault(cancellationToken);
  359. }
  360. public static Task<TSource> Single<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  361. {
  362. if (source == null)
  363. throw new ArgumentNullException("source");
  364. var tcs = new TaskCompletionSource<TSource>();
  365. var e = source.GetEnumerator();
  366. e.MoveNext(cancellationToken).ContinueWith(t =>
  367. {
  368. t.Handle(tcs, res =>
  369. {
  370. if (res)
  371. {
  372. var result = e.Current;
  373. e.MoveNext(cancellationToken).ContinueWith(t1 =>
  374. {
  375. t1.Handle(tcs, res1 =>
  376. {
  377. if (res1)
  378. tcs.TrySetException(new InvalidOperationException());
  379. else
  380. tcs.TrySetResult(result);
  381. });
  382. });
  383. }
  384. else
  385. tcs.TrySetException(new InvalidOperationException());
  386. });
  387. });
  388. return tcs.Task.Finally(e.Dispose);
  389. }
  390. public static Task<TSource> Single<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  391. {
  392. if (source == null)
  393. throw new ArgumentNullException("source");
  394. if (predicate == null)
  395. throw new ArgumentNullException("predicate");
  396. return source.Where(predicate).Single(cancellationToken);
  397. }
  398. public static Task<TSource> SingleOrDefault<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  399. {
  400. if (source == null)
  401. throw new ArgumentNullException("source");
  402. var tcs = new TaskCompletionSource<TSource>();
  403. var e = source.GetEnumerator();
  404. e.MoveNext(cancellationToken).ContinueWith(t =>
  405. {
  406. t.Handle(tcs, res =>
  407. {
  408. if (res)
  409. {
  410. var result = e.Current;
  411. e.MoveNext(cancellationToken).ContinueWith(t1 =>
  412. {
  413. t1.Handle(tcs, res1 =>
  414. {
  415. if (res1)
  416. tcs.TrySetException(new InvalidOperationException());
  417. else
  418. tcs.TrySetResult(result);
  419. });
  420. });
  421. }
  422. else
  423. tcs.TrySetResult(default(TSource));
  424. });
  425. });
  426. return tcs.Task.Finally(e.Dispose);
  427. }
  428. public static Task<TSource> SingleOrDefault<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  429. {
  430. if (source == null)
  431. throw new ArgumentNullException("source");
  432. if (predicate == null)
  433. throw new ArgumentNullException("predicate");
  434. return source.Where(predicate).SingleOrDefault(cancellationToken);
  435. }
  436. public static Task<TSource> ElementAt<TSource>(this IAsyncEnumerable<TSource> source, int index, CancellationToken cancellationToken)
  437. {
  438. if (source == null)
  439. throw new ArgumentNullException("source");
  440. if (index < 0)
  441. throw new ArgumentOutOfRangeException("index");
  442. var tcs = new TaskCompletionSource<TSource>();
  443. var e = source.GetEnumerator();
  444. var next = default(Action<CancellationToken>);
  445. next = ct => e.MoveNext(ct).ContinueWith(t =>
  446. {
  447. t.Handle(tcs, res =>
  448. {
  449. if (res)
  450. {
  451. if (index == 0)
  452. {
  453. tcs.TrySetResult(e.Current);
  454. }
  455. else
  456. {
  457. index--;
  458. next(ct);
  459. }
  460. }
  461. else
  462. {
  463. tcs.TrySetException(new ArgumentOutOfRangeException());
  464. }
  465. });
  466. });
  467. next(cancellationToken);
  468. return tcs.Task.Finally(e.Dispose);
  469. }
  470. public static Task<TSource> ElementAtOrDefault<TSource>(this IAsyncEnumerable<TSource> source, int index, CancellationToken cancellationToken)
  471. {
  472. if (source == null)
  473. throw new ArgumentNullException("source");
  474. if (index < 0)
  475. throw new ArgumentOutOfRangeException("index");
  476. var tcs = new TaskCompletionSource<TSource>();
  477. var e = source.GetEnumerator();
  478. var next = default(Action<CancellationToken>);
  479. next = ct => e.MoveNext(ct).ContinueWith(t =>
  480. {
  481. t.Handle(tcs, res =>
  482. {
  483. if (res)
  484. {
  485. if (index == 0)
  486. {
  487. tcs.TrySetResult(e.Current);
  488. }
  489. else
  490. {
  491. index--;
  492. next(ct);
  493. }
  494. }
  495. else
  496. {
  497. tcs.TrySetResult(default(TSource));
  498. }
  499. });
  500. });
  501. next(cancellationToken);
  502. return tcs.Task.Finally(e.Dispose);
  503. }
  504. public static Task<TSource[]> ToArray<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  505. {
  506. if (source == null)
  507. throw new ArgumentNullException("source");
  508. return source.Aggregate(new List<TSource>(), (list, x) => { list.Add(x); return list; }, list => list.ToArray(), cancellationToken);
  509. }
  510. public static Task<List<TSource>> ToList<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  511. {
  512. if (source == null)
  513. throw new ArgumentNullException("source");
  514. return source.Aggregate(new List<TSource>(), (list, x) => { list.Add(x); return list; }, cancellationToken);
  515. }
  516. public static Task<Dictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  517. {
  518. if (source == null)
  519. throw new ArgumentNullException("source");
  520. if (keySelector == null)
  521. throw new ArgumentNullException("keySelector");
  522. if (elementSelector == null)
  523. throw new ArgumentNullException("elementSelector");
  524. if (comparer == null)
  525. throw new ArgumentNullException("comparer");
  526. return source.Aggregate(new Dictionary<TKey, TElement>(comparer), (d, x) => { d.Add(keySelector(x), elementSelector(x)); return d; }, cancellationToken);
  527. }
  528. public static Task<Dictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, CancellationToken cancellationToken)
  529. {
  530. if (source == null)
  531. throw new ArgumentNullException("source");
  532. if (keySelector == null)
  533. throw new ArgumentNullException("keySelector");
  534. if (elementSelector == null)
  535. throw new ArgumentNullException("elementSelector");
  536. return source.ToDictionary(keySelector, elementSelector, EqualityComparer<TKey>.Default, cancellationToken);
  537. }
  538. public static Task<Dictionary<TKey, TSource>> ToDictionary<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  539. {
  540. if (source == null)
  541. throw new ArgumentNullException("source");
  542. if (keySelector == null)
  543. throw new ArgumentNullException("keySelector");
  544. if (comparer == null)
  545. throw new ArgumentNullException("comparer");
  546. return source.ToDictionary(keySelector, x => x, comparer, cancellationToken);
  547. }
  548. public static Task<Dictionary<TKey, TSource>> ToDictionary<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, CancellationToken cancellationToken)
  549. {
  550. if (source == null)
  551. throw new ArgumentNullException("source");
  552. if (keySelector == null)
  553. throw new ArgumentNullException("keySelector");
  554. return source.ToDictionary(keySelector, x => x, EqualityComparer<TKey>.Default, cancellationToken);
  555. }
  556. public static Task<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  557. {
  558. if (source == null)
  559. throw new ArgumentNullException("source");
  560. if (keySelector == null)
  561. throw new ArgumentNullException("keySelector");
  562. if (elementSelector == null)
  563. throw new ArgumentNullException("elementSelector");
  564. if (comparer == null)
  565. throw new ArgumentNullException("comparer");
  566. return source.Aggregate(new Lookup<TKey, TElement>(comparer), (lookup, x) => { lookup.Add(keySelector(x), elementSelector(x)); return lookup; }, lookup => (ILookup<TKey, TElement>)lookup, cancellationToken);
  567. }
  568. public static Task<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, CancellationToken cancellationToken)
  569. {
  570. if (source == null)
  571. throw new ArgumentNullException("source");
  572. if (keySelector == null)
  573. throw new ArgumentNullException("keySelector");
  574. if (elementSelector == null)
  575. throw new ArgumentNullException("elementSelector");
  576. return source.ToLookup(keySelector, elementSelector, EqualityComparer<TKey>.Default, cancellationToken);
  577. }
  578. public static Task<ILookup<TKey, TSource>> ToLookup<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  579. {
  580. if (source == null)
  581. throw new ArgumentNullException("source");
  582. if (keySelector == null)
  583. throw new ArgumentNullException("keySelector");
  584. if (comparer == null)
  585. throw new ArgumentNullException("comparer");
  586. return source.ToLookup(keySelector, x => x, comparer, cancellationToken);
  587. }
  588. public static Task<ILookup<TKey, TSource>> ToLookup<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, CancellationToken cancellationToken)
  589. {
  590. if (source == null)
  591. throw new ArgumentNullException("source");
  592. if (keySelector == null)
  593. throw new ArgumentNullException("keySelector");
  594. return source.ToLookup(keySelector, x => x, EqualityComparer<TKey>.Default, cancellationToken);
  595. }
  596. class Lookup<TKey, TElement> : ILookup<TKey, TElement>
  597. {
  598. private readonly Dictionary<TKey, EnumerableGrouping<TKey, TElement>> map;
  599. public Lookup(IEqualityComparer<TKey> comparer)
  600. {
  601. map = new Dictionary<TKey, EnumerableGrouping<TKey, TElement>>(comparer);
  602. }
  603. public void Add(TKey key, TElement element)
  604. {
  605. var g = default(EnumerableGrouping<TKey, TElement>);
  606. if (!map.TryGetValue(key, out g))
  607. {
  608. g = new EnumerableGrouping<TKey, TElement>(key);
  609. map.Add(key, g);
  610. }
  611. g.Add(element);
  612. }
  613. public bool Contains(TKey key)
  614. {
  615. return map.ContainsKey(key);
  616. }
  617. public int Count
  618. {
  619. get { return map.Keys.Count; }
  620. }
  621. public IEnumerable<TElement> this[TKey key]
  622. {
  623. get { return map[key]; }
  624. }
  625. public IEnumerator<IGrouping<TKey, TElement>> GetEnumerator()
  626. {
  627. return map.Values.Cast<IGrouping<TKey, TElement>>().GetEnumerator();
  628. }
  629. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  630. {
  631. return GetEnumerator();
  632. }
  633. }
  634. public static Task<double> Average(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
  635. {
  636. if (source == null)
  637. throw new ArgumentNullException("source");
  638. var tcs = new TaskCompletionSource<double>();
  639. var count = 0L;
  640. var sum = 0.0;
  641. var e = source.GetEnumerator();
  642. var f = default(Action<CancellationToken>);
  643. f = ct => e.MoveNext(ct).ContinueWith(t =>
  644. {
  645. t.Handle(tcs, res =>
  646. {
  647. if (res)
  648. {
  649. count++;
  650. sum += e.Current;
  651. f(ct);
  652. }
  653. else
  654. {
  655. if (count == 0)
  656. tcs.TrySetException(new InvalidOperationException());
  657. else
  658. tcs.TrySetResult(sum / count);
  659. }
  660. });
  661. });
  662. f(cancellationToken);
  663. return tcs.Task.Finally(e.Dispose);
  664. }
  665. public static Task<double?> Average(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
  666. {
  667. if (source == null)
  668. throw new ArgumentNullException("source");
  669. var tcs = new TaskCompletionSource<double?>();
  670. var count = 0L;
  671. var sum = 0.0;
  672. var e = source.GetEnumerator();
  673. var f = default(Action<CancellationToken>);
  674. f = ct => e.MoveNext(ct).ContinueWith(t =>
  675. {
  676. t.Handle(tcs, res =>
  677. {
  678. if (res)
  679. {
  680. if (e.Current.HasValue)
  681. {
  682. count++;
  683. sum += e.Current.Value;
  684. }
  685. f(ct);
  686. }
  687. else
  688. {
  689. if (count == 0)
  690. tcs.TrySetResult(null);
  691. else
  692. tcs.TrySetResult(sum / count);
  693. }
  694. });
  695. });
  696. f(cancellationToken);
  697. return tcs.Task.Finally(e.Dispose);
  698. }
  699. public static Task<double> Average(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
  700. {
  701. if (source == null)
  702. throw new ArgumentNullException("source");
  703. var tcs = new TaskCompletionSource<double>();
  704. var count = 0L;
  705. var sum = 0.0;
  706. var e = source.GetEnumerator();
  707. var f = default(Action<CancellationToken>);
  708. f = ct => e.MoveNext(ct).ContinueWith(t =>
  709. {
  710. t.Handle(tcs, res =>
  711. {
  712. if (res)
  713. {
  714. count++;
  715. sum += e.Current;
  716. f(ct);
  717. }
  718. else
  719. {
  720. if (count == 0)
  721. tcs.TrySetException(new InvalidOperationException());
  722. else
  723. tcs.TrySetResult(sum / count);
  724. }
  725. });
  726. });
  727. f(cancellationToken);
  728. return tcs.Task.Finally(e.Dispose);
  729. }
  730. public static Task<double?> Average(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
  731. {
  732. if (source == null)
  733. throw new ArgumentNullException("source");
  734. var tcs = new TaskCompletionSource<double?>();
  735. var count = 0L;
  736. var sum = 0.0;
  737. var e = source.GetEnumerator();
  738. var f = default(Action<CancellationToken>);
  739. f = ct => e.MoveNext(ct).ContinueWith(t =>
  740. {
  741. t.Handle(tcs, res =>
  742. {
  743. if (res)
  744. {
  745. if (e.Current.HasValue)
  746. {
  747. count++;
  748. sum += e.Current.Value;
  749. }
  750. f(ct);
  751. }
  752. else
  753. {
  754. if (count == 0)
  755. tcs.TrySetResult(null);
  756. else
  757. tcs.TrySetResult(sum / count);
  758. }
  759. });
  760. });
  761. f(cancellationToken);
  762. return tcs.Task.Finally(e.Dispose);
  763. }
  764. public static Task<double> Average(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
  765. {
  766. if (source == null)
  767. throw new ArgumentNullException("source");
  768. var tcs = new TaskCompletionSource<double>();
  769. var count = 0L;
  770. var sum = 0.0;
  771. var e = source.GetEnumerator();
  772. var f = default(Action<CancellationToken>);
  773. f = ct => e.MoveNext(ct).ContinueWith(t =>
  774. {
  775. t.Handle(tcs, res =>
  776. {
  777. if (res)
  778. {
  779. count++;
  780. sum += e.Current;
  781. f(ct);
  782. }
  783. else
  784. {
  785. if (count == 0)
  786. tcs.TrySetException(new InvalidOperationException());
  787. else
  788. tcs.TrySetResult(sum / count);
  789. }
  790. });
  791. });
  792. f(cancellationToken);
  793. return tcs.Task.Finally(e.Dispose);
  794. }
  795. public static Task<double?> Average(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
  796. {
  797. if (source == null)
  798. throw new ArgumentNullException("source");
  799. var tcs = new TaskCompletionSource<double?>();
  800. var count = 0L;
  801. var sum = 0.0;
  802. var e = source.GetEnumerator();
  803. var f = default(Action<CancellationToken>);
  804. f = ct => e.MoveNext(ct).ContinueWith(t =>
  805. {
  806. t.Handle(tcs, res =>
  807. {
  808. if (res)
  809. {
  810. if (e.Current.HasValue)
  811. {
  812. count++;
  813. sum += e.Current.Value;
  814. }
  815. f(ct);
  816. }
  817. else
  818. {
  819. if (count == 0)
  820. tcs.TrySetResult(null);
  821. else
  822. tcs.TrySetResult(sum / count);
  823. }
  824. });
  825. });
  826. f(cancellationToken);
  827. return tcs.Task.Finally(e.Dispose);
  828. }
  829. public static Task<float> Average(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
  830. {
  831. if (source == null)
  832. throw new ArgumentNullException("source");
  833. var tcs = new TaskCompletionSource<float>();
  834. var count = 0L;
  835. var sum = 0f;
  836. var e = source.GetEnumerator();
  837. var f = default(Action<CancellationToken>);
  838. f = ct => e.MoveNext(ct).ContinueWith(t =>
  839. {
  840. t.Handle(tcs, res =>
  841. {
  842. if (res)
  843. {
  844. count++;
  845. sum += e.Current;
  846. f(ct);
  847. }
  848. else
  849. {
  850. if (count == 0)
  851. tcs.TrySetException(new InvalidOperationException());
  852. else
  853. tcs.TrySetResult(sum / count);
  854. }
  855. });
  856. });
  857. f(cancellationToken);
  858. return tcs.Task.Finally(e.Dispose);
  859. }
  860. public static Task<float?> Average(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
  861. {
  862. if (source == null)
  863. throw new ArgumentNullException("source");
  864. var tcs = new TaskCompletionSource<float?>();
  865. var count = 0L;
  866. var sum = 0f;
  867. var e = source.GetEnumerator();
  868. var f = default(Action<CancellationToken>);
  869. f = ct => e.MoveNext(ct).ContinueWith(t =>
  870. {
  871. t.Handle(tcs, res =>
  872. {
  873. if (res)
  874. {
  875. if (e.Current.HasValue)
  876. {
  877. count++;
  878. sum += e.Current.Value;
  879. }
  880. f(ct);
  881. }
  882. else
  883. {
  884. if (count == 0)
  885. tcs.TrySetResult(null);
  886. else
  887. tcs.TrySetResult(sum / count);
  888. }
  889. });
  890. });
  891. f(cancellationToken);
  892. return tcs.Task.Finally(e.Dispose);
  893. }
  894. public static Task<decimal> Average(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
  895. {
  896. if (source == null)
  897. throw new ArgumentNullException("source");
  898. var tcs = new TaskCompletionSource<decimal>();
  899. var count = 0L;
  900. var sum = 0m;
  901. var e = source.GetEnumerator();
  902. var f = default(Action<CancellationToken>);
  903. f = ct => e.MoveNext(ct).ContinueWith(t =>
  904. {
  905. t.Handle(tcs, res =>
  906. {
  907. if (res)
  908. {
  909. count++;
  910. sum += e.Current;
  911. f(ct);
  912. }
  913. else
  914. {
  915. if (count == 0)
  916. tcs.TrySetException(new InvalidOperationException());
  917. else
  918. tcs.TrySetResult(sum / count);
  919. }
  920. });
  921. });
  922. f(cancellationToken);
  923. return tcs.Task.Finally(e.Dispose);
  924. }
  925. public static Task<decimal?> Average(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
  926. {
  927. if (source == null)
  928. throw new ArgumentNullException("source");
  929. var tcs = new TaskCompletionSource<decimal?>();
  930. var count = 0L;
  931. var sum = 0m;
  932. var e = source.GetEnumerator();
  933. var f = default(Action<CancellationToken>);
  934. f = ct => e.MoveNext(ct).ContinueWith(t =>
  935. {
  936. t.Handle(tcs, res =>
  937. {
  938. if (res)
  939. {
  940. if (e.Current.HasValue)
  941. {
  942. count++;
  943. sum += e.Current.Value;
  944. }
  945. f(ct);
  946. }
  947. else
  948. {
  949. if (count == 0)
  950. tcs.TrySetResult(null);
  951. else
  952. tcs.TrySetResult(sum / count);
  953. }
  954. });
  955. });
  956. f(cancellationToken);
  957. return tcs.Task.Finally(e.Dispose);
  958. }
  959. public static Task<double?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector, CancellationToken cancellationToken)
  960. {
  961. if (source == null)
  962. throw new ArgumentNullException("source");
  963. if (selector == null)
  964. throw new ArgumentNullException("selector");
  965. return source.Select(selector).Average(cancellationToken);
  966. }
  967. public static Task<double> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector, CancellationToken cancellationToken)
  968. {
  969. if (source == null)
  970. throw new ArgumentNullException("source");
  971. if (selector == null)
  972. throw new ArgumentNullException("selector");
  973. return source.Select(selector).Average(cancellationToken);
  974. }
  975. public static Task<double> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector, CancellationToken cancellationToken)
  976. {
  977. if (source == null)
  978. throw new ArgumentNullException("source");
  979. if (selector == null)
  980. throw new ArgumentNullException("selector");
  981. return source.Select(selector).Average(cancellationToken);
  982. }
  983. public static Task<double?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector, CancellationToken cancellationToken)
  984. {
  985. if (source == null)
  986. throw new ArgumentNullException("source");
  987. if (selector == null)
  988. throw new ArgumentNullException("selector");
  989. return source.Select(selector).Average(cancellationToken);
  990. }
  991. public static Task<double> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector, CancellationToken cancellationToken)
  992. {
  993. if (source == null)
  994. throw new ArgumentNullException("source");
  995. if (selector == null)
  996. throw new ArgumentNullException("selector");
  997. return source.Select(selector).Average(cancellationToken);
  998. }
  999. public static Task<double?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector, CancellationToken cancellationToken)
  1000. {
  1001. if (source == null)
  1002. throw new ArgumentNullException("source");
  1003. if (selector == null)
  1004. throw new ArgumentNullException("selector");
  1005. return source.Select(selector).Average(cancellationToken);
  1006. }
  1007. public static Task<float> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector, CancellationToken cancellationToken)
  1008. {
  1009. if (source == null)
  1010. throw new ArgumentNullException("source");
  1011. if (selector == null)
  1012. throw new ArgumentNullException("selector");
  1013. return source.Select(selector).Average(cancellationToken);
  1014. }
  1015. public static Task<float?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector, CancellationToken cancellationToken)
  1016. {
  1017. if (source == null)
  1018. throw new ArgumentNullException("source");
  1019. if (selector == null)
  1020. throw new ArgumentNullException("selector");
  1021. return source.Select(selector).Average(cancellationToken);
  1022. }
  1023. public static Task<decimal> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector, CancellationToken cancellationToken)
  1024. {
  1025. if (source == null)
  1026. throw new ArgumentNullException("source");
  1027. if (selector == null)
  1028. throw new ArgumentNullException("selector");
  1029. return source.Select(selector).Average(cancellationToken);
  1030. }
  1031. public static Task<decimal?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector, CancellationToken cancellationToken)
  1032. {
  1033. if (source == null)
  1034. throw new ArgumentNullException("source");
  1035. if (selector == null)
  1036. throw new ArgumentNullException("selector");
  1037. return source.Select(selector).Average(cancellationToken);
  1038. }
  1039. public static Task<int> Max(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
  1040. {
  1041. if (source == null)
  1042. throw new ArgumentNullException("source");
  1043. return source.Aggregate(Math.Max, cancellationToken);
  1044. }
  1045. public static Task<long> Max(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
  1046. {
  1047. if (source == null)
  1048. throw new ArgumentNullException("source");
  1049. return source.Aggregate(Math.Max, cancellationToken);
  1050. }
  1051. public static Task<double> Max(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
  1052. {
  1053. if (source == null)
  1054. throw new ArgumentNullException("source");
  1055. return source.Aggregate(Math.Max, cancellationToken);
  1056. }
  1057. public static Task<float> Max(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
  1058. {
  1059. if (source == null)
  1060. throw new ArgumentNullException("source");
  1061. return source.Aggregate(Math.Max, cancellationToken);
  1062. }
  1063. public static Task<decimal> Max(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
  1064. {
  1065. if (source == null)
  1066. throw new ArgumentNullException("source");
  1067. return source.Aggregate(Math.Max, cancellationToken);
  1068. }
  1069. static T? NullableMax<T>(T? x, T? y)
  1070. where T : struct, IComparable<T>
  1071. {
  1072. if (!x.HasValue)
  1073. return y;
  1074. if (!y.HasValue)
  1075. return x;
  1076. if (x.Value.CompareTo(y.Value) >= 0)
  1077. return x;
  1078. return y;
  1079. }
  1080. public static Task<int?> Max(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
  1081. {
  1082. if (source == null)
  1083. throw new ArgumentNullException("source");
  1084. return source.Aggregate(default(int?), NullableMax, cancellationToken);
  1085. }
  1086. public static Task<long?> Max(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
  1087. {
  1088. if (source == null)
  1089. throw new ArgumentNullException("source");
  1090. return source.Aggregate(default(long?), NullableMax, cancellationToken);
  1091. }
  1092. public static Task<double?> Max(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
  1093. {
  1094. if (source == null)
  1095. throw new ArgumentNullException("source");
  1096. return source.Aggregate(default(double?), NullableMax, cancellationToken);
  1097. }
  1098. public static Task<float?> Max(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
  1099. {
  1100. if (source == null)
  1101. throw new ArgumentNullException("source");
  1102. return source.Aggregate(default(float?), NullableMax, cancellationToken);
  1103. }
  1104. public static Task<decimal?> Max(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
  1105. {
  1106. if (source == null)
  1107. throw new ArgumentNullException("source");
  1108. return source.Aggregate(default(decimal?), NullableMax, cancellationToken);
  1109. }
  1110. public static Task<TSource> Max<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  1111. {
  1112. if (source == null)
  1113. throw new ArgumentNullException("source");
  1114. var comparer = Comparer<TSource>.Default;
  1115. return source.Aggregate((x, y) => comparer.Compare(x, y) >= 0 ? x : y, cancellationToken);
  1116. }
  1117. public static Task<int> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector, CancellationToken cancellationToken)
  1118. {
  1119. if (source == null)
  1120. throw new ArgumentNullException("source");
  1121. if (selector == null)
  1122. throw new ArgumentNullException("selector");
  1123. return source.Select(selector).Max(cancellationToken);
  1124. }
  1125. public static Task<long> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector, CancellationToken cancellationToken)
  1126. {
  1127. if (source == null)
  1128. throw new ArgumentNullException("source");
  1129. if (selector == null)
  1130. throw new ArgumentNullException("selector");
  1131. return source.Select(selector).Max(cancellationToken);
  1132. }
  1133. public static Task<double> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector, CancellationToken cancellationToken)
  1134. {
  1135. if (source == null)
  1136. throw new ArgumentNullException("source");
  1137. if (selector == null)
  1138. throw new ArgumentNullException("selector");
  1139. return source.Select(selector).Max(cancellationToken);
  1140. }
  1141. public static Task<float> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector, CancellationToken cancellationToken)
  1142. {
  1143. if (source == null)
  1144. throw new ArgumentNullException("source");
  1145. if (selector == null)
  1146. throw new ArgumentNullException("selector");
  1147. return source.Select(selector).Max(cancellationToken);
  1148. }
  1149. public static Task<decimal> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector, CancellationToken cancellationToken)
  1150. {
  1151. if (source == null)
  1152. throw new ArgumentNullException("source");
  1153. if (selector == null)
  1154. throw new ArgumentNullException("selector");
  1155. return source.Select(selector).Max(cancellationToken);
  1156. }
  1157. public static Task<int?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector, CancellationToken cancellationToken)
  1158. {
  1159. if (source == null)
  1160. throw new ArgumentNullException("source");
  1161. if (selector == null)
  1162. throw new ArgumentNullException("selector");
  1163. return source.Select(selector).Max(cancellationToken);
  1164. }
  1165. public static Task<long?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector, CancellationToken cancellationToken)
  1166. {
  1167. if (source == null)
  1168. throw new ArgumentNullException("source");
  1169. if (selector == null)
  1170. throw new ArgumentNullException("selector");
  1171. return source.Select(selector).Max(cancellationToken);
  1172. }
  1173. public static Task<double?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector, CancellationToken cancellationToken)
  1174. {
  1175. if (source == null)
  1176. throw new ArgumentNullException("source");
  1177. if (selector == null)
  1178. throw new ArgumentNullException("selector");
  1179. return source.Select(selector).Max(cancellationToken);
  1180. }
  1181. public static Task<float?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector, CancellationToken cancellationToken)
  1182. {
  1183. if (source == null)
  1184. throw new ArgumentNullException("source");
  1185. if (selector == null)
  1186. throw new ArgumentNullException("selector");
  1187. return source.Select(selector).Max(cancellationToken);
  1188. }
  1189. public static Task<decimal?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector, CancellationToken cancellationToken)
  1190. {
  1191. if (source == null)
  1192. throw new ArgumentNullException("source");
  1193. if (selector == null)
  1194. throw new ArgumentNullException("selector");
  1195. return source.Select(selector).Max(cancellationToken);
  1196. }
  1197. public static Task<TResult> Max<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> selector, CancellationToken cancellationToken)
  1198. {
  1199. if (source == null)
  1200. throw new ArgumentNullException("source");
  1201. if (selector == null)
  1202. throw new ArgumentNullException("selector");
  1203. return source.Select(selector).Max(cancellationToken);
  1204. }
  1205. public static Task<int> Min(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
  1206. {
  1207. if (source == null)
  1208. throw new ArgumentNullException("source");
  1209. return source.Aggregate(Math.Min, cancellationToken);
  1210. }
  1211. public static Task<long> Min(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
  1212. {
  1213. if (source == null)
  1214. throw new ArgumentNullException("source");
  1215. return source.Aggregate(Math.Min, cancellationToken);
  1216. }
  1217. public static Task<double> Min(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
  1218. {
  1219. if (source == null)
  1220. throw new ArgumentNullException("source");
  1221. return source.Aggregate(Math.Min, cancellationToken);
  1222. }
  1223. public static Task<float> Min(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
  1224. {
  1225. if (source == null)
  1226. throw new ArgumentNullException("source");
  1227. return source.Aggregate(Math.Min, cancellationToken);
  1228. }
  1229. public static Task<decimal> Min(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
  1230. {
  1231. if (source == null)
  1232. throw new ArgumentNullException("source");
  1233. return source.Aggregate(Math.Min, cancellationToken);
  1234. }
  1235. static T? NullableMin<T>(T? x, T? y)
  1236. where T : struct, IComparable<T>
  1237. {
  1238. if (!x.HasValue)
  1239. return y;
  1240. if (!y.HasValue)
  1241. return x;
  1242. if (x.Value.CompareTo(y.Value) <= 0)
  1243. return x;
  1244. return y;
  1245. }
  1246. public static Task<int?> Min(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
  1247. {
  1248. if (source == null)
  1249. throw new ArgumentNullException("source");
  1250. return source.Aggregate(default(int?), NullableMin, cancellationToken);
  1251. }
  1252. public static Task<long?> Min(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
  1253. {
  1254. if (source == null)
  1255. throw new ArgumentNullException("source");
  1256. return source.Aggregate(default(long?), NullableMin, cancellationToken);
  1257. }
  1258. public static Task<double?> Min(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
  1259. {
  1260. if (source == null)
  1261. throw new ArgumentNullException("source");
  1262. return source.Aggregate(default(double?), NullableMin, cancellationToken);
  1263. }
  1264. public static Task<float?> Min(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
  1265. {
  1266. if (source == null)
  1267. throw new ArgumentNullException("source");
  1268. return source.Aggregate(default(float?), NullableMin, cancellationToken);
  1269. }
  1270. public static Task<decimal?> Min(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
  1271. {
  1272. if (source == null)
  1273. throw new ArgumentNullException("source");
  1274. return source.Aggregate(default(decimal?), NullableMin, cancellationToken);
  1275. }
  1276. public static Task<TSource> Min<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  1277. {
  1278. if (source == null)
  1279. throw new ArgumentNullException("source");
  1280. var comparer = Comparer<TSource>.Default;
  1281. return source.Aggregate((x, y) => comparer.Compare(x, y) <= 0 ? x : y, cancellationToken);
  1282. }
  1283. public static Task<int> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector, CancellationToken cancellationToken)
  1284. {
  1285. if (source == null)
  1286. throw new ArgumentNullException("source");
  1287. if (selector == null)
  1288. throw new ArgumentNullException("selector");
  1289. return source.Select(selector).Min(cancellationToken);
  1290. }
  1291. public static Task<long> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector, CancellationToken cancellationToken)
  1292. {
  1293. if (source == null)
  1294. throw new ArgumentNullException("source");
  1295. if (selector == null)
  1296. throw new ArgumentNullException("selector");
  1297. return source.Select(selector).Min(cancellationToken);
  1298. }
  1299. public static Task<double> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector, CancellationToken cancellationToken)
  1300. {
  1301. if (source == null)
  1302. throw new ArgumentNullException("source");
  1303. if (selector == null)
  1304. throw new ArgumentNullException("selector");
  1305. return source.Select(selector).Min(cancellationToken);
  1306. }
  1307. public static Task<float> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector, CancellationToken cancellationToken)
  1308. {
  1309. if (source == null)
  1310. throw new ArgumentNullException("source");
  1311. if (selector == null)
  1312. throw new ArgumentNullException("selector");
  1313. return source.Select(selector).Min(cancellationToken);
  1314. }
  1315. public static Task<decimal> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector, CancellationToken cancellationToken)
  1316. {
  1317. if (source == null)
  1318. throw new ArgumentNullException("source");
  1319. if (selector == null)
  1320. throw new ArgumentNullException("selector");
  1321. return source.Select(selector).Min(cancellationToken);
  1322. }
  1323. public static Task<int?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector, CancellationToken cancellationToken)
  1324. {
  1325. if (source == null)
  1326. throw new ArgumentNullException("source");
  1327. if (selector == null)
  1328. throw new ArgumentNullException("selector");
  1329. return source.Select(selector).Min(cancellationToken);
  1330. }
  1331. public static Task<long?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector, CancellationToken cancellationToken)
  1332. {
  1333. if (source == null)
  1334. throw new ArgumentNullException("source");
  1335. if (selector == null)
  1336. throw new ArgumentNullException("selector");
  1337. return source.Select(selector).Min(cancellationToken);
  1338. }
  1339. public static Task<double?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector, CancellationToken cancellationToken)
  1340. {
  1341. if (source == null)
  1342. throw new ArgumentNullException("source");
  1343. if (selector == null)
  1344. throw new ArgumentNullException("selector");
  1345. return source.Select(selector).Min(cancellationToken);
  1346. }
  1347. public static Task<float?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector, CancellationToken cancellationToken)
  1348. {
  1349. if (source == null)
  1350. throw new ArgumentNullException("source");
  1351. if (selector == null)
  1352. throw new ArgumentNullException("selector");
  1353. return source.Select(selector).Min(cancellationToken);
  1354. }
  1355. public static Task<decimal?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector, CancellationToken cancellationToken)
  1356. {
  1357. if (source == null)
  1358. throw new ArgumentNullException("source");
  1359. if (selector == null)
  1360. throw new ArgumentNullException("selector");
  1361. return source.Select(selector).Min(cancellationToken);
  1362. }
  1363. public static Task<TResult> Min<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> selector, CancellationToken cancellationToken)
  1364. {
  1365. if (source == null)
  1366. throw new ArgumentNullException("source");
  1367. if (selector == null)
  1368. throw new ArgumentNullException("selector");
  1369. return source.Select(selector).Min(cancellationToken);
  1370. }
  1371. public static Task<int> Sum(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
  1372. {
  1373. if (source == null)
  1374. throw new ArgumentNullException("source");
  1375. return source.Aggregate(0, (x, y) => x + y, cancellationToken);
  1376. }
  1377. public static Task<long> Sum(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
  1378. {
  1379. if (source == null)
  1380. throw new ArgumentNullException("source");
  1381. return source.Aggregate(0L, (x, y) => x + y, cancellationToken);
  1382. }
  1383. public static Task<double> Sum(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
  1384. {
  1385. if (source == null)
  1386. throw new ArgumentNullException("source");
  1387. return source.Aggregate(0.0, (x, y) => x + y, cancellationToken);
  1388. }
  1389. public static Task<float> Sum(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
  1390. {
  1391. if (source == null)
  1392. throw new ArgumentNullException("source");
  1393. return source.Aggregate(0f, (x, y) => x + y, cancellationToken);
  1394. }
  1395. public static Task<decimal> Sum(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
  1396. {
  1397. if (source == null)
  1398. throw new ArgumentNullException("source");
  1399. return source.Aggregate(0m, (x, y) => x + y, cancellationToken);
  1400. }
  1401. public static Task<int?> Sum(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
  1402. {
  1403. if (source == null)
  1404. throw new ArgumentNullException("source");
  1405. return source.Aggregate((int?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1406. }
  1407. public static Task<long?> Sum(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
  1408. {
  1409. if (source == null)
  1410. throw new ArgumentNullException("source");
  1411. return source.Aggregate((long?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1412. }
  1413. public static Task<double?> Sum(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
  1414. {
  1415. if (source == null)
  1416. throw new ArgumentNullException("source");
  1417. return source.Aggregate((double?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1418. }
  1419. public static Task<float?> Sum(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
  1420. {
  1421. if (source == null)
  1422. throw new ArgumentNullException("source");
  1423. return source.Aggregate((float?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1424. }
  1425. public static Task<decimal?> Sum(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
  1426. {
  1427. if (source == null)
  1428. throw new ArgumentNullException("source");
  1429. return source.Aggregate((decimal?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1430. }
  1431. public static Task<int> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector, CancellationToken cancellationToken)
  1432. {
  1433. if (source == null)
  1434. throw new ArgumentNullException("source");
  1435. if (selector == null)
  1436. throw new ArgumentNullException("selector");
  1437. return source.Select(selector).Sum(cancellationToken);
  1438. }
  1439. public static Task<long> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector, CancellationToken cancellationToken)
  1440. {
  1441. if (source == null)
  1442. throw new ArgumentNullException("source");
  1443. if (selector == null)
  1444. throw new ArgumentNullException("selector");
  1445. return source.Select(selector).Sum(cancellationToken);
  1446. }
  1447. public static Task<double> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector, CancellationToken cancellationToken)
  1448. {
  1449. if (source == null)
  1450. throw new ArgumentNullException("source");
  1451. if (selector == null)
  1452. throw new ArgumentNullException("selector");
  1453. return source.Select(selector).Sum(cancellationToken);
  1454. }
  1455. public static Task<float> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector, CancellationToken cancellationToken)
  1456. {
  1457. if (source == null)
  1458. throw new ArgumentNullException("source");
  1459. if (selector == null)
  1460. throw new ArgumentNullException("selector");
  1461. return source.Select(selector).Sum(cancellationToken);
  1462. }
  1463. public static Task<decimal> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector, CancellationToken cancellationToken)
  1464. {
  1465. if (source == null)
  1466. throw new ArgumentNullException("source");
  1467. if (selector == null)
  1468. throw new ArgumentNullException("selector");
  1469. return source.Select(selector).Sum(cancellationToken);
  1470. }
  1471. public static Task<int?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector, CancellationToken cancellationToken)
  1472. {
  1473. if (source == null)
  1474. throw new ArgumentNullException("source");
  1475. if (selector == null)
  1476. throw new ArgumentNullException("selector");
  1477. return source.Select(selector).Sum(cancellationToken);
  1478. }
  1479. public static Task<long?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector, CancellationToken cancellationToken)
  1480. {
  1481. if (source == null)
  1482. throw new ArgumentNullException("source");
  1483. if (selector == null)
  1484. throw new ArgumentNullException("selector");
  1485. return source.Select(selector).Sum(cancellationToken);
  1486. }
  1487. public static Task<double?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector, CancellationToken cancellationToken)
  1488. {
  1489. if (source == null)
  1490. throw new ArgumentNullException("source");
  1491. if (selector == null)
  1492. throw new ArgumentNullException("selector");
  1493. return source.Select(selector).Sum(cancellationToken);
  1494. }
  1495. public static Task<float?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector, CancellationToken cancellationToken)
  1496. {
  1497. if (source == null)
  1498. throw new ArgumentNullException("source");
  1499. if (selector == null)
  1500. throw new ArgumentNullException("selector");
  1501. return source.Select(selector).Sum(cancellationToken);
  1502. }
  1503. public static Task<decimal?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector, CancellationToken cancellationToken)
  1504. {
  1505. if (source == null)
  1506. throw new ArgumentNullException("source");
  1507. if (selector == null)
  1508. throw new ArgumentNullException("selector");
  1509. return source.Select(selector).Sum(cancellationToken);
  1510. }
  1511. public static Task<bool> IsEmpty<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  1512. {
  1513. if (source == null)
  1514. throw new ArgumentNullException("source");
  1515. return source.Any(cancellationToken).ContinueWith(t => !t.Result);
  1516. }
  1517. public static Task<TSource> Min<TSource>(this IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
  1518. {
  1519. if (source == null)
  1520. throw new ArgumentNullException("source");
  1521. if (comparer == null)
  1522. throw new ArgumentNullException("comparer");
  1523. return MinBy(source, x => x, comparer, cancellationToken).ContinueWith(t => t.Result.First());
  1524. }
  1525. public static Task<IList<TSource>> MinBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, CancellationToken cancellationToken)
  1526. {
  1527. if (source == null)
  1528. throw new ArgumentNullException("source");
  1529. if (keySelector == null)
  1530. throw new ArgumentNullException("keySelector");
  1531. return MinBy(source, keySelector, Comparer<TKey>.Default, cancellationToken);
  1532. }
  1533. public static Task<IList<TSource>> MinBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
  1534. {
  1535. if (source == null)
  1536. throw new ArgumentNullException("source");
  1537. if (keySelector == null)
  1538. throw new ArgumentNullException("keySelector");
  1539. if (comparer == null)
  1540. throw new ArgumentNullException("comparer");
  1541. return ExtremaBy(source, keySelector, (key, minValue) => -comparer.Compare(key, minValue), cancellationToken);
  1542. }
  1543. public static Task<TSource> Max<TSource>(this IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
  1544. {
  1545. if (source == null)
  1546. throw new ArgumentNullException("source");
  1547. if (comparer == null)
  1548. throw new ArgumentNullException("comparer");
  1549. return MaxBy(source, x => x, comparer, cancellationToken).ContinueWith(t => t.Result.First());
  1550. }
  1551. public static Task<IList<TSource>> MaxBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, CancellationToken cancellationToken)
  1552. {
  1553. if (source == null)
  1554. throw new ArgumentNullException("source");
  1555. if (keySelector == null)
  1556. throw new ArgumentNullException("keySelector");
  1557. return MaxBy(source, keySelector, Comparer<TKey>.Default, cancellationToken);
  1558. }
  1559. public static Task<IList<TSource>> MaxBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
  1560. {
  1561. if (source == null)
  1562. throw new ArgumentNullException("source");
  1563. if (keySelector == null)
  1564. throw new ArgumentNullException("keySelector");
  1565. if (comparer == null)
  1566. throw new ArgumentNullException("comparer");
  1567. return ExtremaBy(source, keySelector, (key, minValue) => comparer.Compare(key, minValue), cancellationToken);
  1568. }
  1569. private static Task<IList<TSource>> ExtremaBy<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TKey, TKey, int> compare, CancellationToken cancellationToken)
  1570. {
  1571. var tcs = new TaskCompletionSource<IList<TSource>>();
  1572. var result = new List<TSource>();
  1573. var hasFirst = false;
  1574. var current = default(TSource);
  1575. var resKey = default(TKey);
  1576. var e = source.GetEnumerator();
  1577. var f = default(Action<CancellationToken>);
  1578. f = ct => e.MoveNext(ct).ContinueWith(t =>
  1579. {
  1580. t.Handle(tcs, res =>
  1581. {
  1582. if (!hasFirst)
  1583. {
  1584. if (!res)
  1585. {
  1586. tcs.TrySetException(new InvalidOperationException("Source sequence doesn't contain any elements."));
  1587. return;
  1588. }
  1589. current = e.Current;
  1590. try
  1591. {
  1592. resKey = keySelector(current);
  1593. }
  1594. catch (Exception ex)
  1595. {
  1596. tcs.TrySetException(ex);
  1597. return;
  1598. }
  1599. result.Add(current);
  1600. hasFirst = true;
  1601. f(ct);
  1602. }
  1603. else
  1604. {
  1605. if (res)
  1606. {
  1607. var key = default(TKey);
  1608. var cmp = default(int);
  1609. try
  1610. {
  1611. current = e.Current;
  1612. key = keySelector(current);
  1613. cmp = compare(key, resKey);
  1614. }
  1615. catch (Exception ex)
  1616. {
  1617. tcs.TrySetException(ex);
  1618. return;
  1619. }
  1620. if (cmp == 0)
  1621. {
  1622. result.Add(current);
  1623. }
  1624. else if (cmp > 0)
  1625. {
  1626. result = new List<TSource> { current };
  1627. resKey = key;
  1628. }
  1629. f(ct);
  1630. }
  1631. else
  1632. {
  1633. tcs.TrySetResult(result);
  1634. }
  1635. }
  1636. });
  1637. });
  1638. f(cancellationToken);
  1639. return tcs.Task.Finally(e.Dispose);
  1640. }
  1641. }
  1642. }