AsyncEnumerable.Aggregates.cs 70 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Threading.Tasks;
  7. using System.Threading;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static Task<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  13. {
  14. if (source == null)
  15. throw new ArgumentNullException(nameof(source));
  16. if (accumulator == null)
  17. throw new ArgumentNullException(nameof(accumulator));
  18. if (resultSelector == null)
  19. throw new ArgumentNullException(nameof(resultSelector));
  20. return Aggregate_(source, seed, accumulator, resultSelector, cancellationToken);
  21. }
  22. private static async Task<TResult> Aggregate_<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed,
  23. Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  24. {
  25. var acc = seed;
  26. using (var e = source.GetEnumerator())
  27. {
  28. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  29. {
  30. acc = accumulator(acc, e.Current);
  31. }
  32. }
  33. var result = resultSelector(acc);
  34. return result;
  35. }
  36. public static Task<TAccumulate> Aggregate<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken)
  37. {
  38. if (source == null)
  39. throw new ArgumentNullException(nameof(source));
  40. if (accumulator == null)
  41. throw new ArgumentNullException(nameof(accumulator));
  42. return source.Aggregate(seed, accumulator, x => x, cancellationToken);
  43. }
  44. public static Task<TSource> Aggregate<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  45. {
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. if (accumulator == null)
  49. throw new ArgumentNullException(nameof(accumulator));
  50. return Aggregate_(source, accumulator, cancellationToken);
  51. }
  52. private static async Task<TSource> Aggregate_<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  53. {
  54. var first = true;
  55. var acc = default(TSource);
  56. using (var e = source.GetEnumerator())
  57. {
  58. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  59. {
  60. acc = first ? e.Current : accumulator(acc, e.Current);
  61. first = false;
  62. }
  63. }
  64. if (first)
  65. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  66. return acc;
  67. }
  68. public static Task<int> Count<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  69. {
  70. if (source == null)
  71. throw new ArgumentNullException(nameof(source));
  72. return source.Aggregate(0, (c, _) => checked(c + 1), cancellationToken);
  73. }
  74. public static Task<int> Count<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  75. {
  76. if (source == null)
  77. throw new ArgumentNullException(nameof(source));
  78. if (predicate == null)
  79. throw new ArgumentNullException(nameof(predicate));
  80. return source.Where(predicate).Aggregate(0, (c, _) => checked(c + 1), cancellationToken);
  81. }
  82. public static Task<long> LongCount<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  83. {
  84. if (source == null)
  85. throw new ArgumentNullException(nameof(source));
  86. return source.Aggregate(0L, (c, _) => checked(c + 1), cancellationToken);
  87. }
  88. public static Task<long> LongCount<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  89. {
  90. if (source == null)
  91. throw new ArgumentNullException(nameof(source));
  92. if (predicate == null)
  93. throw new ArgumentNullException(nameof(predicate));
  94. return source.Where(predicate).Aggregate(0L, (c, _) => checked(c + 1), cancellationToken);
  95. }
  96. public static Task<bool> All<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  97. {
  98. if (source == null)
  99. throw new ArgumentNullException(nameof(source));
  100. if (predicate == null)
  101. throw new ArgumentNullException(nameof(predicate));
  102. return All_(source, predicate, cancellationToken);
  103. }
  104. private static async Task<bool> All_<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  105. {
  106. using (var e = source.GetEnumerator())
  107. {
  108. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  109. {
  110. if (!predicate(e.Current))
  111. return false;
  112. }
  113. }
  114. return true;
  115. }
  116. public static Task<bool> Any<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  117. {
  118. if (source == null)
  119. throw new ArgumentNullException(nameof(source));
  120. if (predicate == null)
  121. throw new ArgumentNullException(nameof(predicate));
  122. return Any_(source, predicate, cancellationToken);
  123. }
  124. private static async Task<bool> Any_<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  125. {
  126. using (var e = source.GetEnumerator())
  127. {
  128. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  129. {
  130. if (predicate(e.Current))
  131. return true;
  132. }
  133. }
  134. return false;
  135. }
  136. public static Task<bool> Any<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  137. {
  138. if (source == null)
  139. throw new ArgumentNullException(nameof(source));
  140. var e = source.GetEnumerator();
  141. return e.MoveNext(cancellationToken);
  142. }
  143. public static Task<bool> Contains<TSource>(this IAsyncEnumerable<TSource> source, TSource value, IEqualityComparer<TSource> comparer, CancellationToken cancellationToken)
  144. {
  145. if (source == null)
  146. throw new ArgumentNullException(nameof(source));
  147. if (comparer == null)
  148. throw new ArgumentNullException(nameof(comparer));
  149. return source.Any(x => comparer.Equals(x, value), cancellationToken);
  150. }
  151. public static Task<bool> Contains<TSource>(this IAsyncEnumerable<TSource> source, TSource value, CancellationToken cancellationToken)
  152. {
  153. if (source == null)
  154. throw new ArgumentNullException(nameof(source));
  155. return source.Contains(value, EqualityComparer<TSource>.Default, cancellationToken);
  156. }
  157. public static Task<TSource> First<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  158. {
  159. if (source == null)
  160. throw new ArgumentNullException(nameof(source));
  161. return First_(source, cancellationToken);
  162. }
  163. private static async Task<TSource> First_<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  164. {
  165. using (var e = source.GetEnumerator())
  166. {
  167. if (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  168. {
  169. return e.Current;
  170. }
  171. }
  172. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  173. }
  174. public static Task<TSource> First<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  175. {
  176. if (source == null)
  177. throw new ArgumentNullException(nameof(source));
  178. if (predicate == null)
  179. throw new ArgumentNullException(nameof(predicate));
  180. return source.Where(predicate).First(cancellationToken);
  181. }
  182. public static Task<TSource> FirstOrDefault<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  183. {
  184. if (source == null)
  185. throw new ArgumentNullException(nameof(source));
  186. return FirstOrDefault_(source, cancellationToken);
  187. }
  188. private static async Task<TSource> FirstOrDefault_<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  189. {
  190. using (var e = source.GetEnumerator())
  191. {
  192. if (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  193. {
  194. return e.Current;
  195. }
  196. }
  197. return default(TSource);
  198. }
  199. public static Task<TSource> FirstOrDefault<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  200. {
  201. if (source == null)
  202. throw new ArgumentNullException(nameof(source));
  203. if (predicate == null)
  204. throw new ArgumentNullException(nameof(predicate));
  205. return source.Where(predicate).FirstOrDefault(cancellationToken);
  206. }
  207. public static Task<TSource> Last<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  208. {
  209. if (source == null)
  210. throw new ArgumentNullException(nameof(source));
  211. return Last_(source, cancellationToken);
  212. }
  213. private static async Task<TSource> Last_<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  214. {
  215. var last = default(TSource);
  216. var hasLast = false;
  217. using (var e = source.GetEnumerator())
  218. {
  219. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  220. {
  221. hasLast = true;
  222. last = e.Current;
  223. }
  224. }
  225. if (!hasLast)
  226. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  227. return last;
  228. }
  229. public static Task<TSource> Last<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  230. {
  231. if (source == null)
  232. throw new ArgumentNullException(nameof(source));
  233. if (predicate == null)
  234. throw new ArgumentNullException(nameof(predicate));
  235. return source.Where(predicate).Last(cancellationToken);
  236. }
  237. public static Task<TSource> LastOrDefault<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  238. {
  239. if (source == null)
  240. throw new ArgumentNullException(nameof(source));
  241. return LastOrDefault_(source, cancellationToken);
  242. }
  243. private static async Task<TSource> LastOrDefault_<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  244. {
  245. var last = default(TSource);
  246. var hasLast = false;
  247. using (var e = source.GetEnumerator())
  248. {
  249. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  250. {
  251. hasLast = true;
  252. last = e.Current;
  253. }
  254. }
  255. return !hasLast ? default(TSource) : last;
  256. }
  257. public static Task<TSource> LastOrDefault<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  258. {
  259. if (source == null)
  260. throw new ArgumentNullException(nameof(source));
  261. if (predicate == null)
  262. throw new ArgumentNullException(nameof(predicate));
  263. return source.Where(predicate).LastOrDefault(cancellationToken);
  264. }
  265. public static Task<TSource> Single<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  266. {
  267. if (source == null)
  268. throw new ArgumentNullException(nameof(source));
  269. return Single_(source, cancellationToken);
  270. }
  271. private static async Task<TSource> Single_<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  272. {
  273. using (var e = source.GetEnumerator())
  274. {
  275. if (!await e.MoveNext(cancellationToken).ConfigureAwait(false))
  276. {
  277. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  278. }
  279. var result = e.Current;
  280. if (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  281. {
  282. throw new InvalidOperationException(Strings.MORE_THAN_ONE_ELEMENT);
  283. }
  284. return result;
  285. }
  286. }
  287. public static Task<TSource> Single<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  288. {
  289. if (source == null)
  290. throw new ArgumentNullException(nameof(source));
  291. if (predicate == null)
  292. throw new ArgumentNullException(nameof(predicate));
  293. return source.Where(predicate).Single(cancellationToken);
  294. }
  295. public static Task<TSource> SingleOrDefault<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  296. {
  297. if (source == null)
  298. throw new ArgumentNullException(nameof(source));
  299. return SingleOrDefault_(source, cancellationToken);
  300. }
  301. private static async Task<TSource> SingleOrDefault_<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  302. {
  303. using (var e = source.GetEnumerator())
  304. {
  305. if (!await e.MoveNext(cancellationToken).ConfigureAwait(false))
  306. {
  307. return default(TSource);
  308. }
  309. TSource result = e.Current;
  310. if (!await e.MoveNext(cancellationToken).ConfigureAwait(false))
  311. {
  312. return result;
  313. }
  314. }
  315. throw new InvalidOperationException(Strings.MORE_THAN_ONE_ELEMENT);
  316. }
  317. public static Task<TSource> SingleOrDefault<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
  318. {
  319. if (source == null)
  320. throw new ArgumentNullException(nameof(source));
  321. if (predicate == null)
  322. throw new ArgumentNullException(nameof(predicate));
  323. return source.Where(predicate).SingleOrDefault(cancellationToken);
  324. }
  325. public static Task<TSource> ElementAt<TSource>(this IAsyncEnumerable<TSource> source, int index, CancellationToken cancellationToken)
  326. {
  327. if (source == null)
  328. throw new ArgumentNullException(nameof(source));
  329. if (index < 0)
  330. throw new ArgumentOutOfRangeException(nameof(index));
  331. return ElementAt_(source, index, cancellationToken);
  332. }
  333. private static async Task<TSource> ElementAt_<TSource>(IAsyncEnumerable<TSource> source, int index, CancellationToken cancellationToken)
  334. {
  335. if (index >= 0)
  336. {
  337. using (var e = source.GetEnumerator())
  338. {
  339. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  340. {
  341. if (index == 0)
  342. {
  343. return e.Current;
  344. }
  345. index--;
  346. }
  347. }
  348. }
  349. throw new ArgumentOutOfRangeException(nameof(index));
  350. }
  351. public static Task<TSource> ElementAtOrDefault<TSource>(this IAsyncEnumerable<TSource> source, int index, CancellationToken cancellationToken)
  352. {
  353. if (source == null)
  354. throw new ArgumentNullException(nameof(source));
  355. if (index < 0)
  356. throw new ArgumentOutOfRangeException(nameof(index));
  357. return ElementAtOrDefault_(source, index, cancellationToken);
  358. }
  359. private static async Task<TSource> ElementAtOrDefault_<TSource>(IAsyncEnumerable<TSource> source, int index, CancellationToken cancellationToken)
  360. {
  361. if (index >= 0)
  362. {
  363. using (var e = source.GetEnumerator())
  364. {
  365. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  366. {
  367. if (index == 0)
  368. {
  369. return e.Current;
  370. }
  371. index--;
  372. }
  373. }
  374. }
  375. return default(TSource);
  376. }
  377. public static Task<TSource[]> ToArray<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  378. {
  379. if (source == null)
  380. throw new ArgumentNullException(nameof(source));
  381. return source.Aggregate(new List<TSource>(), (list, x) => { list.Add(x); return list; }, list => list.ToArray(), cancellationToken);
  382. }
  383. public static Task<List<TSource>> ToList<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  384. {
  385. if (source == null)
  386. throw new ArgumentNullException(nameof(source));
  387. return source.Aggregate(new List<TSource>(), (list, x) => { list.Add(x); return list; }, cancellationToken);
  388. }
  389. public static Task<Dictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  390. {
  391. if (source == null)
  392. throw new ArgumentNullException(nameof(source));
  393. if (keySelector == null)
  394. throw new ArgumentNullException(nameof(keySelector));
  395. if (elementSelector == null)
  396. throw new ArgumentNullException(nameof(elementSelector));
  397. if (comparer == null)
  398. throw new ArgumentNullException(nameof(comparer));
  399. return source.Aggregate(new Dictionary<TKey, TElement>(comparer), (d, x) => { d.Add(keySelector(x), elementSelector(x)); return d; }, cancellationToken);
  400. }
  401. public static Task<Dictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, CancellationToken cancellationToken)
  402. {
  403. if (source == null)
  404. throw new ArgumentNullException(nameof(source));
  405. if (keySelector == null)
  406. throw new ArgumentNullException(nameof(keySelector));
  407. if (elementSelector == null)
  408. throw new ArgumentNullException(nameof(elementSelector));
  409. return source.ToDictionary(keySelector, elementSelector, EqualityComparer<TKey>.Default, cancellationToken);
  410. }
  411. public static Task<Dictionary<TKey, TSource>> ToDictionary<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  412. {
  413. if (source == null)
  414. throw new ArgumentNullException(nameof(source));
  415. if (keySelector == null)
  416. throw new ArgumentNullException(nameof(keySelector));
  417. if (comparer == null)
  418. throw new ArgumentNullException(nameof(comparer));
  419. return source.ToDictionary(keySelector, x => x, comparer, cancellationToken);
  420. }
  421. public static Task<Dictionary<TKey, TSource>> ToDictionary<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, CancellationToken cancellationToken)
  422. {
  423. if (source == null)
  424. throw new ArgumentNullException(nameof(source));
  425. if (keySelector == null)
  426. throw new ArgumentNullException(nameof(keySelector));
  427. return source.ToDictionary(keySelector, x => x, EqualityComparer<TKey>.Default, cancellationToken);
  428. }
  429. public static async Task<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  430. {
  431. if (source == null)
  432. throw new ArgumentNullException(nameof(source));
  433. if (keySelector == null)
  434. throw new ArgumentNullException(nameof(keySelector));
  435. if (elementSelector == null)
  436. throw new ArgumentNullException(nameof(elementSelector));
  437. if (comparer == null)
  438. throw new ArgumentNullException(nameof(comparer));
  439. var lookup = await Internal.Lookup<TKey, TElement>.CreateAsync(source, keySelector, elementSelector, comparer, cancellationToken).ConfigureAwait(false);
  440. return lookup;
  441. }
  442. public static Task<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, CancellationToken cancellationToken)
  443. {
  444. if (source == null)
  445. throw new ArgumentNullException(nameof(source));
  446. if (keySelector == null)
  447. throw new ArgumentNullException(nameof(keySelector));
  448. if (elementSelector == null)
  449. throw new ArgumentNullException(nameof(elementSelector));
  450. return source.ToLookup(keySelector, elementSelector, EqualityComparer<TKey>.Default, cancellationToken);
  451. }
  452. public static Task<ILookup<TKey, TSource>> ToLookup<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  453. {
  454. if (source == null)
  455. throw new ArgumentNullException(nameof(source));
  456. if (keySelector == null)
  457. throw new ArgumentNullException(nameof(keySelector));
  458. if (comparer == null)
  459. throw new ArgumentNullException(nameof(comparer));
  460. return source.ToLookup(keySelector, x => x, comparer, cancellationToken);
  461. }
  462. public static Task<ILookup<TKey, TSource>> ToLookup<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, CancellationToken cancellationToken)
  463. {
  464. if (source == null)
  465. throw new ArgumentNullException(nameof(source));
  466. if (keySelector == null)
  467. throw new ArgumentNullException(nameof(keySelector));
  468. return source.ToLookup(keySelector, x => x, EqualityComparer<TKey>.Default, cancellationToken);
  469. }
  470. public static Task<double> Average(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
  471. {
  472. if (source == null)
  473. throw new ArgumentNullException(nameof(source));
  474. return Average_(source, cancellationToken);
  475. }
  476. private static async Task<double> Average_(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
  477. {
  478. using (var e = source.GetEnumerator())
  479. {
  480. if (!await e.MoveNext(cancellationToken).ConfigureAwait(false))
  481. {
  482. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  483. }
  484. long sum = e.Current;
  485. long count = 1;
  486. checked
  487. {
  488. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  489. {
  490. sum += e.Current;
  491. ++count;
  492. }
  493. }
  494. return (double)sum / count;
  495. }
  496. }
  497. public static Task<double?> Average(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
  498. {
  499. if (source == null)
  500. throw new ArgumentNullException(nameof(source));
  501. return Average_(source, cancellationToken);
  502. }
  503. private static async Task<double?> Average_(IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
  504. {
  505. using (var e = source.GetEnumerator())
  506. {
  507. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  508. {
  509. int? v = e.Current;
  510. if (v.HasValue)
  511. {
  512. long sum = v.GetValueOrDefault();
  513. long count = 1;
  514. checked
  515. {
  516. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  517. {
  518. v = e.Current;
  519. if (v.HasValue)
  520. {
  521. sum += v.GetValueOrDefault();
  522. ++count;
  523. }
  524. }
  525. }
  526. return (double)sum / count;
  527. }
  528. }
  529. }
  530. return null;
  531. }
  532. public static Task<double> Average(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
  533. {
  534. if (source == null)
  535. throw new ArgumentNullException(nameof(source));
  536. return Average_(source, cancellationToken);
  537. }
  538. private static async Task<double> Average_(IAsyncEnumerable<long> source, CancellationToken cancellationToken)
  539. {
  540. using (var e = source.GetEnumerator())
  541. {
  542. if (!await e.MoveNext(cancellationToken).ConfigureAwait(false))
  543. {
  544. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  545. }
  546. long sum = e.Current;
  547. long count = 1;
  548. checked
  549. {
  550. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  551. {
  552. sum += e.Current;
  553. ++count;
  554. }
  555. }
  556. return (double)sum / count;
  557. }
  558. }
  559. public static Task<double?> Average(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
  560. {
  561. if (source == null)
  562. throw new ArgumentNullException(nameof(source));
  563. return Average_(source, cancellationToken);
  564. }
  565. private static async Task<double?> Average_(IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
  566. {
  567. using (var e = source.GetEnumerator())
  568. {
  569. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  570. {
  571. long? v = e.Current;
  572. if (v.HasValue)
  573. {
  574. long sum = v.GetValueOrDefault();
  575. long count = 1;
  576. checked
  577. {
  578. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  579. {
  580. v = e.Current;
  581. if (v.HasValue)
  582. {
  583. sum += v.GetValueOrDefault();
  584. ++count;
  585. }
  586. }
  587. }
  588. return (double)sum / count;
  589. }
  590. }
  591. }
  592. return null;
  593. }
  594. public static Task<double> Average(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
  595. {
  596. if (source == null)
  597. throw new ArgumentNullException(nameof(source));
  598. return Average_(source, cancellationToken);
  599. }
  600. private static async Task<double> Average_(IAsyncEnumerable<double> source, CancellationToken cancellationToken)
  601. {
  602. using (var e = source.GetEnumerator())
  603. {
  604. if (!await e.MoveNext(cancellationToken).ConfigureAwait(false))
  605. {
  606. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  607. }
  608. double sum = e.Current;
  609. long count = 1;
  610. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  611. {
  612. // There is an opportunity to short-circuit here, in that if e.Current is
  613. // ever NaN then the result will always be NaN. Assuming that this case is
  614. // rare enough that not checking is the better approach generally.
  615. sum += e.Current;
  616. ++count;
  617. }
  618. return sum / count;
  619. }
  620. }
  621. public static Task<double?> Average(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
  622. {
  623. if (source == null)
  624. throw new ArgumentNullException(nameof(source));
  625. return Average_(source, cancellationToken);
  626. }
  627. private static async Task<double?> Average_(IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
  628. {
  629. using (var e = source.GetEnumerator())
  630. {
  631. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  632. {
  633. double? v = e.Current;
  634. if (v.HasValue)
  635. {
  636. double sum = v.GetValueOrDefault();
  637. long count = 1;
  638. checked
  639. {
  640. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  641. {
  642. v = e.Current;
  643. if (v.HasValue)
  644. {
  645. sum += v.GetValueOrDefault();
  646. ++count;
  647. }
  648. }
  649. }
  650. return sum / count;
  651. }
  652. }
  653. }
  654. return null;
  655. }
  656. public static Task<float> Average(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
  657. {
  658. if (source == null)
  659. throw new ArgumentNullException(nameof(source));
  660. return Average_(source, cancellationToken);
  661. }
  662. private static async Task<float> Average_(IAsyncEnumerable<float> source, CancellationToken cancellationToken)
  663. {
  664. using (var e = source.GetEnumerator())
  665. {
  666. if (!await e.MoveNext(cancellationToken).ConfigureAwait(false))
  667. {
  668. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  669. }
  670. double sum = e.Current;
  671. long count = 1;
  672. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  673. {
  674. sum += e.Current;
  675. ++count;
  676. }
  677. return (float)(sum / count);
  678. }
  679. }
  680. public static Task<float?> Average(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
  681. {
  682. if (source == null)
  683. throw new ArgumentNullException(nameof(source));
  684. return Average_(source, cancellationToken);
  685. }
  686. private static async Task<float?> Average_(IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
  687. {
  688. using (var e = source.GetEnumerator())
  689. {
  690. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  691. {
  692. float? v = e.Current;
  693. if (v.HasValue)
  694. {
  695. double sum = v.GetValueOrDefault();
  696. long count = 1;
  697. checked
  698. {
  699. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  700. {
  701. v = e.Current;
  702. if (v.HasValue)
  703. {
  704. sum += v.GetValueOrDefault();
  705. ++count;
  706. }
  707. }
  708. }
  709. return (float)(sum / count);
  710. }
  711. }
  712. }
  713. return null;
  714. }
  715. public static Task<decimal> Average(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
  716. {
  717. if (source == null)
  718. throw new ArgumentNullException(nameof(source));
  719. return Average_(source, cancellationToken);
  720. }
  721. private static async Task<decimal> Average_(IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
  722. {
  723. using (var e = source.GetEnumerator())
  724. {
  725. if (!await e.MoveNext(cancellationToken).ConfigureAwait(false))
  726. {
  727. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  728. }
  729. decimal sum = e.Current;
  730. long count = 1;
  731. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  732. {
  733. sum += e.Current;
  734. ++count;
  735. }
  736. return sum / count;
  737. }
  738. }
  739. public static Task<decimal?> Average(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
  740. {
  741. if (source == null)
  742. throw new ArgumentNullException(nameof(source));
  743. return Average_(source, cancellationToken);
  744. }
  745. private static async Task<decimal?> Average_(IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
  746. {
  747. using (var e = source.GetEnumerator())
  748. {
  749. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  750. {
  751. decimal? v = e.Current;
  752. if (v.HasValue)
  753. {
  754. decimal sum = v.GetValueOrDefault();
  755. long count = 1;
  756. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  757. {
  758. v = e.Current;
  759. if (v.HasValue)
  760. {
  761. sum += v.GetValueOrDefault();
  762. ++count;
  763. }
  764. }
  765. return sum / count;
  766. }
  767. }
  768. }
  769. return null;
  770. }
  771. public static Task<double?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector, CancellationToken cancellationToken)
  772. {
  773. if (source == null)
  774. throw new ArgumentNullException(nameof(source));
  775. if (selector == null)
  776. throw new ArgumentNullException(nameof(selector));
  777. return source.Select(selector).Average(cancellationToken);
  778. }
  779. public static Task<double> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector, CancellationToken cancellationToken)
  780. {
  781. if (source == null)
  782. throw new ArgumentNullException(nameof(source));
  783. if (selector == null)
  784. throw new ArgumentNullException(nameof(selector));
  785. return source.Select(selector).Average(cancellationToken);
  786. }
  787. public static Task<double> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector, CancellationToken cancellationToken)
  788. {
  789. if (source == null)
  790. throw new ArgumentNullException(nameof(source));
  791. if (selector == null)
  792. throw new ArgumentNullException(nameof(selector));
  793. return source.Select(selector).Average(cancellationToken);
  794. }
  795. public static Task<double?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector, CancellationToken cancellationToken)
  796. {
  797. if (source == null)
  798. throw new ArgumentNullException(nameof(source));
  799. if (selector == null)
  800. throw new ArgumentNullException(nameof(selector));
  801. return source.Select(selector).Average(cancellationToken);
  802. }
  803. public static Task<double> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector, CancellationToken cancellationToken)
  804. {
  805. if (source == null)
  806. throw new ArgumentNullException(nameof(source));
  807. if (selector == null)
  808. throw new ArgumentNullException(nameof(selector));
  809. return source.Select(selector).Average(cancellationToken);
  810. }
  811. public static Task<double?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector, CancellationToken cancellationToken)
  812. {
  813. if (source == null)
  814. throw new ArgumentNullException(nameof(source));
  815. if (selector == null)
  816. throw new ArgumentNullException(nameof(selector));
  817. return source.Select(selector).Average(cancellationToken);
  818. }
  819. public static Task<float> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector, CancellationToken cancellationToken)
  820. {
  821. if (source == null)
  822. throw new ArgumentNullException(nameof(source));
  823. if (selector == null)
  824. throw new ArgumentNullException(nameof(selector));
  825. return source.Select(selector).Average(cancellationToken);
  826. }
  827. public static Task<float?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector, CancellationToken cancellationToken)
  828. {
  829. if (source == null)
  830. throw new ArgumentNullException(nameof(source));
  831. if (selector == null)
  832. throw new ArgumentNullException(nameof(selector));
  833. return source.Select(selector).Average(cancellationToken);
  834. }
  835. public static Task<decimal> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector, CancellationToken cancellationToken)
  836. {
  837. if (source == null)
  838. throw new ArgumentNullException(nameof(source));
  839. if (selector == null)
  840. throw new ArgumentNullException(nameof(selector));
  841. return source.Select(selector).Average(cancellationToken);
  842. }
  843. public static Task<decimal?> Average<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector, CancellationToken cancellationToken)
  844. {
  845. if (source == null)
  846. throw new ArgumentNullException(nameof(source));
  847. if (selector == null)
  848. throw new ArgumentNullException(nameof(selector));
  849. return source.Select(selector).Average(cancellationToken);
  850. }
  851. public static Task<int> Max(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
  852. {
  853. if (source == null)
  854. throw new ArgumentNullException(nameof(source));
  855. return source.Aggregate(Math.Max, cancellationToken);
  856. }
  857. public static Task<long> Max(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
  858. {
  859. if (source == null)
  860. throw new ArgumentNullException(nameof(source));
  861. return source.Aggregate(Math.Max, cancellationToken);
  862. }
  863. public static Task<double> Max(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
  864. {
  865. if (source == null)
  866. throw new ArgumentNullException(nameof(source));
  867. return source.Aggregate(Math.Max, cancellationToken);
  868. }
  869. public static Task<float> Max(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
  870. {
  871. if (source == null)
  872. throw new ArgumentNullException(nameof(source));
  873. return source.Aggregate(Math.Max, cancellationToken);
  874. }
  875. public static Task<decimal> Max(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
  876. {
  877. if (source == null)
  878. throw new ArgumentNullException(nameof(source));
  879. return source.Aggregate(Math.Max, cancellationToken);
  880. }
  881. static T? NullableMax<T>(T? x, T? y)
  882. where T : struct, IComparable<T>
  883. {
  884. if (!x.HasValue)
  885. return y;
  886. if (!y.HasValue)
  887. return x;
  888. if (x.Value.CompareTo(y.Value) >= 0)
  889. return x;
  890. return y;
  891. }
  892. public static Task<int?> Max(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
  893. {
  894. if (source == null)
  895. throw new ArgumentNullException(nameof(source));
  896. return source.Aggregate(default(int?), NullableMax, cancellationToken);
  897. }
  898. public static Task<long?> Max(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
  899. {
  900. if (source == null)
  901. throw new ArgumentNullException(nameof(source));
  902. return source.Aggregate(default(long?), NullableMax, cancellationToken);
  903. }
  904. public static Task<double?> Max(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
  905. {
  906. if (source == null)
  907. throw new ArgumentNullException(nameof(source));
  908. return source.Aggregate(default(double?), NullableMax, cancellationToken);
  909. }
  910. public static Task<float?> Max(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
  911. {
  912. if (source == null)
  913. throw new ArgumentNullException(nameof(source));
  914. return source.Aggregate(default(float?), NullableMax, cancellationToken);
  915. }
  916. public static Task<decimal?> Max(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
  917. {
  918. if (source == null)
  919. throw new ArgumentNullException(nameof(source));
  920. return source.Aggregate(default(decimal?), NullableMax, cancellationToken);
  921. }
  922. public static Task<TSource> Max<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  923. {
  924. if (source == null)
  925. throw new ArgumentNullException(nameof(source));
  926. var comparer = Comparer<TSource>.Default;
  927. return source.Aggregate((x, y) => comparer.Compare(x, y) >= 0 ? x : y, cancellationToken);
  928. }
  929. public static Task<int> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector, CancellationToken cancellationToken)
  930. {
  931. if (source == null)
  932. throw new ArgumentNullException(nameof(source));
  933. if (selector == null)
  934. throw new ArgumentNullException(nameof(selector));
  935. return source.Select(selector).Max(cancellationToken);
  936. }
  937. public static Task<long> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector, CancellationToken cancellationToken)
  938. {
  939. if (source == null)
  940. throw new ArgumentNullException(nameof(source));
  941. if (selector == null)
  942. throw new ArgumentNullException(nameof(selector));
  943. return source.Select(selector).Max(cancellationToken);
  944. }
  945. public static Task<double> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector, CancellationToken cancellationToken)
  946. {
  947. if (source == null)
  948. throw new ArgumentNullException(nameof(source));
  949. if (selector == null)
  950. throw new ArgumentNullException(nameof(selector));
  951. return source.Select(selector).Max(cancellationToken);
  952. }
  953. public static Task<float> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector, CancellationToken cancellationToken)
  954. {
  955. if (source == null)
  956. throw new ArgumentNullException(nameof(source));
  957. if (selector == null)
  958. throw new ArgumentNullException(nameof(selector));
  959. return source.Select(selector).Max(cancellationToken);
  960. }
  961. public static Task<decimal> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector, CancellationToken cancellationToken)
  962. {
  963. if (source == null)
  964. throw new ArgumentNullException(nameof(source));
  965. if (selector == null)
  966. throw new ArgumentNullException(nameof(selector));
  967. return source.Select(selector).Max(cancellationToken);
  968. }
  969. public static Task<int?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector, CancellationToken cancellationToken)
  970. {
  971. if (source == null)
  972. throw new ArgumentNullException(nameof(source));
  973. if (selector == null)
  974. throw new ArgumentNullException(nameof(selector));
  975. return source.Select(selector).Max(cancellationToken);
  976. }
  977. public static Task<long?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector, CancellationToken cancellationToken)
  978. {
  979. if (source == null)
  980. throw new ArgumentNullException(nameof(source));
  981. if (selector == null)
  982. throw new ArgumentNullException(nameof(selector));
  983. return source.Select(selector).Max(cancellationToken);
  984. }
  985. public static Task<double?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector, CancellationToken cancellationToken)
  986. {
  987. if (source == null)
  988. throw new ArgumentNullException(nameof(source));
  989. if (selector == null)
  990. throw new ArgumentNullException(nameof(selector));
  991. return source.Select(selector).Max(cancellationToken);
  992. }
  993. public static Task<float?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector, CancellationToken cancellationToken)
  994. {
  995. if (source == null)
  996. throw new ArgumentNullException(nameof(source));
  997. if (selector == null)
  998. throw new ArgumentNullException(nameof(selector));
  999. return source.Select(selector).Max(cancellationToken);
  1000. }
  1001. public static Task<decimal?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector, CancellationToken cancellationToken)
  1002. {
  1003. if (source == null)
  1004. throw new ArgumentNullException(nameof(source));
  1005. if (selector == null)
  1006. throw new ArgumentNullException(nameof(selector));
  1007. return source.Select(selector).Max(cancellationToken);
  1008. }
  1009. public static Task<TResult> Max<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> selector, CancellationToken cancellationToken)
  1010. {
  1011. if (source == null)
  1012. throw new ArgumentNullException(nameof(source));
  1013. if (selector == null)
  1014. throw new ArgumentNullException(nameof(selector));
  1015. return source.Select(selector).Max(cancellationToken);
  1016. }
  1017. public static Task<int> Min(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
  1018. {
  1019. if (source == null)
  1020. throw new ArgumentNullException(nameof(source));
  1021. return source.Aggregate(Math.Min, cancellationToken);
  1022. }
  1023. public static Task<long> Min(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
  1024. {
  1025. if (source == null)
  1026. throw new ArgumentNullException(nameof(source));
  1027. return source.Aggregate(Math.Min, cancellationToken);
  1028. }
  1029. public static Task<double> Min(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
  1030. {
  1031. if (source == null)
  1032. throw new ArgumentNullException(nameof(source));
  1033. return source.Aggregate(Math.Min, cancellationToken);
  1034. }
  1035. public static Task<float> Min(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
  1036. {
  1037. if (source == null)
  1038. throw new ArgumentNullException(nameof(source));
  1039. return source.Aggregate(Math.Min, cancellationToken);
  1040. }
  1041. public static Task<decimal> Min(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
  1042. {
  1043. if (source == null)
  1044. throw new ArgumentNullException(nameof(source));
  1045. return source.Aggregate(Math.Min, cancellationToken);
  1046. }
  1047. static T? NullableMin<T>(T? x, T? y)
  1048. where T : struct, IComparable<T>
  1049. {
  1050. if (!x.HasValue)
  1051. return y;
  1052. if (!y.HasValue)
  1053. return x;
  1054. if (x.Value.CompareTo(y.Value) <= 0)
  1055. return x;
  1056. return y;
  1057. }
  1058. public static Task<int?> Min(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
  1059. {
  1060. if (source == null)
  1061. throw new ArgumentNullException(nameof(source));
  1062. return source.Aggregate(default(int?), NullableMin, cancellationToken);
  1063. }
  1064. public static Task<long?> Min(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
  1065. {
  1066. if (source == null)
  1067. throw new ArgumentNullException(nameof(source));
  1068. return source.Aggregate(default(long?), NullableMin, cancellationToken);
  1069. }
  1070. public static Task<double?> Min(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
  1071. {
  1072. if (source == null)
  1073. throw new ArgumentNullException(nameof(source));
  1074. return source.Aggregate(default(double?), NullableMin, cancellationToken);
  1075. }
  1076. public static Task<float?> Min(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
  1077. {
  1078. if (source == null)
  1079. throw new ArgumentNullException(nameof(source));
  1080. return source.Aggregate(default(float?), NullableMin, cancellationToken);
  1081. }
  1082. public static Task<decimal?> Min(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
  1083. {
  1084. if (source == null)
  1085. throw new ArgumentNullException(nameof(source));
  1086. return source.Aggregate(default(decimal?), NullableMin, cancellationToken);
  1087. }
  1088. public static Task<TSource> Min<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  1089. {
  1090. if (source == null)
  1091. throw new ArgumentNullException(nameof(source));
  1092. var comparer = Comparer<TSource>.Default;
  1093. return source.Aggregate((x, y) => comparer.Compare(x, y) <= 0 ? x : y, cancellationToken);
  1094. }
  1095. public static Task<int> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector, CancellationToken cancellationToken)
  1096. {
  1097. if (source == null)
  1098. throw new ArgumentNullException(nameof(source));
  1099. if (selector == null)
  1100. throw new ArgumentNullException(nameof(selector));
  1101. return source.Select(selector).Min(cancellationToken);
  1102. }
  1103. public static Task<long> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector, CancellationToken cancellationToken)
  1104. {
  1105. if (source == null)
  1106. throw new ArgumentNullException(nameof(source));
  1107. if (selector == null)
  1108. throw new ArgumentNullException(nameof(selector));
  1109. return source.Select(selector).Min(cancellationToken);
  1110. }
  1111. public static Task<double> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector, CancellationToken cancellationToken)
  1112. {
  1113. if (source == null)
  1114. throw new ArgumentNullException(nameof(source));
  1115. if (selector == null)
  1116. throw new ArgumentNullException(nameof(selector));
  1117. return source.Select(selector).Min(cancellationToken);
  1118. }
  1119. public static Task<float> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector, CancellationToken cancellationToken)
  1120. {
  1121. if (source == null)
  1122. throw new ArgumentNullException(nameof(source));
  1123. if (selector == null)
  1124. throw new ArgumentNullException(nameof(selector));
  1125. return source.Select(selector).Min(cancellationToken);
  1126. }
  1127. public static Task<decimal> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector, CancellationToken cancellationToken)
  1128. {
  1129. if (source == null)
  1130. throw new ArgumentNullException(nameof(source));
  1131. if (selector == null)
  1132. throw new ArgumentNullException(nameof(selector));
  1133. return source.Select(selector).Min(cancellationToken);
  1134. }
  1135. public static Task<int?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector, CancellationToken cancellationToken)
  1136. {
  1137. if (source == null)
  1138. throw new ArgumentNullException(nameof(source));
  1139. if (selector == null)
  1140. throw new ArgumentNullException(nameof(selector));
  1141. return source.Select(selector).Min(cancellationToken);
  1142. }
  1143. public static Task<long?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector, CancellationToken cancellationToken)
  1144. {
  1145. if (source == null)
  1146. throw new ArgumentNullException(nameof(source));
  1147. if (selector == null)
  1148. throw new ArgumentNullException(nameof(selector));
  1149. return source.Select(selector).Min(cancellationToken);
  1150. }
  1151. public static Task<double?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector, CancellationToken cancellationToken)
  1152. {
  1153. if (source == null)
  1154. throw new ArgumentNullException(nameof(source));
  1155. if (selector == null)
  1156. throw new ArgumentNullException(nameof(selector));
  1157. return source.Select(selector).Min(cancellationToken);
  1158. }
  1159. public static Task<float?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector, CancellationToken cancellationToken)
  1160. {
  1161. if (source == null)
  1162. throw new ArgumentNullException(nameof(source));
  1163. if (selector == null)
  1164. throw new ArgumentNullException(nameof(selector));
  1165. return source.Select(selector).Min(cancellationToken);
  1166. }
  1167. public static Task<decimal?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector, CancellationToken cancellationToken)
  1168. {
  1169. if (source == null)
  1170. throw new ArgumentNullException(nameof(source));
  1171. if (selector == null)
  1172. throw new ArgumentNullException(nameof(selector));
  1173. return source.Select(selector).Min(cancellationToken);
  1174. }
  1175. public static Task<TResult> Min<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> selector, CancellationToken cancellationToken)
  1176. {
  1177. if (source == null)
  1178. throw new ArgumentNullException(nameof(source));
  1179. if (selector == null)
  1180. throw new ArgumentNullException(nameof(selector));
  1181. return source.Select(selector).Min(cancellationToken);
  1182. }
  1183. public static Task<int> Sum(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
  1184. {
  1185. if (source == null)
  1186. throw new ArgumentNullException(nameof(source));
  1187. return source.Aggregate(0, (x, y) => x + y, cancellationToken);
  1188. }
  1189. public static Task<long> Sum(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
  1190. {
  1191. if (source == null)
  1192. throw new ArgumentNullException(nameof(source));
  1193. return source.Aggregate(0L, (x, y) => x + y, cancellationToken);
  1194. }
  1195. public static Task<double> Sum(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
  1196. {
  1197. if (source == null)
  1198. throw new ArgumentNullException(nameof(source));
  1199. return source.Aggregate(0.0, (x, y) => x + y, cancellationToken);
  1200. }
  1201. public static Task<float> Sum(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
  1202. {
  1203. if (source == null)
  1204. throw new ArgumentNullException(nameof(source));
  1205. return source.Aggregate(0f, (x, y) => x + y, cancellationToken);
  1206. }
  1207. public static Task<decimal> Sum(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
  1208. {
  1209. if (source == null)
  1210. throw new ArgumentNullException(nameof(source));
  1211. return source.Aggregate(0m, (x, y) => x + y, cancellationToken);
  1212. }
  1213. public static Task<int?> Sum(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
  1214. {
  1215. if (source == null)
  1216. throw new ArgumentNullException(nameof(source));
  1217. return source.Aggregate((int?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1218. }
  1219. public static Task<long?> Sum(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
  1220. {
  1221. if (source == null)
  1222. throw new ArgumentNullException(nameof(source));
  1223. return source.Aggregate((long?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1224. }
  1225. public static Task<double?> Sum(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
  1226. {
  1227. if (source == null)
  1228. throw new ArgumentNullException(nameof(source));
  1229. return source.Aggregate((double?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1230. }
  1231. public static Task<float?> Sum(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
  1232. {
  1233. if (source == null)
  1234. throw new ArgumentNullException(nameof(source));
  1235. return source.Aggregate((float?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1236. }
  1237. public static Task<decimal?> Sum(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
  1238. {
  1239. if (source == null)
  1240. throw new ArgumentNullException(nameof(source));
  1241. return source.Aggregate((decimal?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken);
  1242. }
  1243. public static Task<int> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector, CancellationToken cancellationToken)
  1244. {
  1245. if (source == null)
  1246. throw new ArgumentNullException(nameof(source));
  1247. if (selector == null)
  1248. throw new ArgumentNullException(nameof(selector));
  1249. return source.Select(selector).Sum(cancellationToken);
  1250. }
  1251. public static Task<long> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector, CancellationToken cancellationToken)
  1252. {
  1253. if (source == null)
  1254. throw new ArgumentNullException(nameof(source));
  1255. if (selector == null)
  1256. throw new ArgumentNullException(nameof(selector));
  1257. return source.Select(selector).Sum(cancellationToken);
  1258. }
  1259. public static Task<double> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector, CancellationToken cancellationToken)
  1260. {
  1261. if (source == null)
  1262. throw new ArgumentNullException(nameof(source));
  1263. if (selector == null)
  1264. throw new ArgumentNullException(nameof(selector));
  1265. return source.Select(selector).Sum(cancellationToken);
  1266. }
  1267. public static Task<float> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector, CancellationToken cancellationToken)
  1268. {
  1269. if (source == null)
  1270. throw new ArgumentNullException(nameof(source));
  1271. if (selector == null)
  1272. throw new ArgumentNullException(nameof(selector));
  1273. return source.Select(selector).Sum(cancellationToken);
  1274. }
  1275. public static Task<decimal> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector, CancellationToken cancellationToken)
  1276. {
  1277. if (source == null)
  1278. throw new ArgumentNullException(nameof(source));
  1279. if (selector == null)
  1280. throw new ArgumentNullException(nameof(selector));
  1281. return source.Select(selector).Sum(cancellationToken);
  1282. }
  1283. public static Task<int?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector, CancellationToken cancellationToken)
  1284. {
  1285. if (source == null)
  1286. throw new ArgumentNullException(nameof(source));
  1287. if (selector == null)
  1288. throw new ArgumentNullException(nameof(selector));
  1289. return source.Select(selector).Sum(cancellationToken);
  1290. }
  1291. public static Task<long?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector, CancellationToken cancellationToken)
  1292. {
  1293. if (source == null)
  1294. throw new ArgumentNullException(nameof(source));
  1295. if (selector == null)
  1296. throw new ArgumentNullException(nameof(selector));
  1297. return source.Select(selector).Sum(cancellationToken);
  1298. }
  1299. public static Task<double?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector, CancellationToken cancellationToken)
  1300. {
  1301. if (source == null)
  1302. throw new ArgumentNullException(nameof(source));
  1303. if (selector == null)
  1304. throw new ArgumentNullException(nameof(selector));
  1305. return source.Select(selector).Sum(cancellationToken);
  1306. }
  1307. public static Task<float?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector, CancellationToken cancellationToken)
  1308. {
  1309. if (source == null)
  1310. throw new ArgumentNullException(nameof(source));
  1311. if (selector == null)
  1312. throw new ArgumentNullException(nameof(selector));
  1313. return source.Select(selector).Sum(cancellationToken);
  1314. }
  1315. public static Task<decimal?> Sum<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector, CancellationToken cancellationToken)
  1316. {
  1317. if (source == null)
  1318. throw new ArgumentNullException(nameof(source));
  1319. if (selector == null)
  1320. throw new ArgumentNullException(nameof(selector));
  1321. return source.Select(selector).Sum(cancellationToken);
  1322. }
  1323. public static Task<bool> IsEmpty<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  1324. {
  1325. if (source == null)
  1326. throw new ArgumentNullException(nameof(source));
  1327. return IsEmpty_(source, cancellationToken);
  1328. }
  1329. private static async Task<bool> IsEmpty_<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
  1330. {
  1331. return !await source.Any(cancellationToken).ConfigureAwait(false);
  1332. }
  1333. public static Task<TSource> Min<TSource>(this IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
  1334. {
  1335. if (source == null)
  1336. throw new ArgumentNullException(nameof(source));
  1337. if (comparer == null)
  1338. throw new ArgumentNullException(nameof(comparer));
  1339. return Min_(source, comparer, cancellationToken);
  1340. }
  1341. private static async Task<TSource> Min_<TSource>(IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
  1342. {
  1343. return (await MinBy(source, x => x, comparer, cancellationToken).ConfigureAwait(false)).First();
  1344. }
  1345. public static Task<IList<TSource>> MinBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, CancellationToken cancellationToken)
  1346. {
  1347. if (source == null)
  1348. throw new ArgumentNullException(nameof(source));
  1349. if (keySelector == null)
  1350. throw new ArgumentNullException(nameof(keySelector));
  1351. return MinBy(source, keySelector, Comparer<TKey>.Default, cancellationToken);
  1352. }
  1353. public static Task<IList<TSource>> MinBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
  1354. {
  1355. if (source == null)
  1356. throw new ArgumentNullException(nameof(source));
  1357. if (keySelector == null)
  1358. throw new ArgumentNullException(nameof(keySelector));
  1359. if (comparer == null)
  1360. throw new ArgumentNullException(nameof(comparer));
  1361. return ExtremaBy(source, keySelector, (key, minValue) => -comparer.Compare(key, minValue), cancellationToken);
  1362. }
  1363. public static Task<TSource> Max<TSource>(this IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
  1364. {
  1365. if (source == null)
  1366. throw new ArgumentNullException(nameof(source));
  1367. if (comparer == null)
  1368. throw new ArgumentNullException(nameof(comparer));
  1369. return Max_(source, comparer, cancellationToken);
  1370. }
  1371. private static async Task<TSource> Max_<TSource>(IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
  1372. {
  1373. return (await MaxBy(source, x => x, comparer, cancellationToken).ConfigureAwait(false)).First();
  1374. }
  1375. public static Task<IList<TSource>> MaxBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, CancellationToken cancellationToken)
  1376. {
  1377. if (source == null)
  1378. throw new ArgumentNullException(nameof(source));
  1379. if (keySelector == null)
  1380. throw new ArgumentNullException(nameof(keySelector));
  1381. return MaxBy(source, keySelector, Comparer<TKey>.Default, cancellationToken);
  1382. }
  1383. public static Task<IList<TSource>> MaxBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
  1384. {
  1385. if (source == null)
  1386. throw new ArgumentNullException(nameof(source));
  1387. if (keySelector == null)
  1388. throw new ArgumentNullException(nameof(keySelector));
  1389. if (comparer == null)
  1390. throw new ArgumentNullException(nameof(comparer));
  1391. return ExtremaBy(source, keySelector, (key, minValue) => comparer.Compare(key, minValue), cancellationToken);
  1392. }
  1393. private static async Task<IList<TSource>> ExtremaBy<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TKey, TKey, int> compare, CancellationToken cancellationToken)
  1394. {
  1395. var result = new List<TSource>();
  1396. using (var e = source.GetEnumerator())
  1397. {
  1398. if (!await e.MoveNext(cancellationToken).ConfigureAwait(false))
  1399. throw new InvalidOperationException(Strings.NO_ELEMENTS);
  1400. var current = e.Current;
  1401. var resKey = keySelector(current);
  1402. result.Add(current);
  1403. while (await e.MoveNext(cancellationToken).ConfigureAwait(false))
  1404. {
  1405. var cur = e.Current;
  1406. var key = keySelector(cur);
  1407. var cmp = compare(key, resKey);
  1408. if (cmp == 0)
  1409. {
  1410. result.Add(cur);
  1411. }
  1412. else if (cmp > 0)
  1413. {
  1414. result = new List<TSource> { cur };
  1415. resKey = key;
  1416. }
  1417. }
  1418. }
  1419. return result;
  1420. }
  1421. }
  1422. }