// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Threading; namespace System.Linq { public static partial class AsyncEnumerable { public static Task Aggregate(this IAsyncEnumerable source, TAccumulate seed, Func accumulator, Func resultSelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (accumulator == null) throw new ArgumentNullException(nameof(accumulator)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return Aggregate_(source, seed, accumulator, resultSelector, cancellationToken); } private static async Task Aggregate_(IAsyncEnumerable source, TAccumulate seed, Func accumulator, Func resultSelector, CancellationToken cancellationToken) { var acc = seed; using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { acc = accumulator(acc, e.Current); } } return resultSelector(acc); } public static Task Aggregate(this IAsyncEnumerable source, TAccumulate seed, Func accumulator, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (accumulator == null) throw new ArgumentNullException(nameof(accumulator)); return source.Aggregate(seed, accumulator, x => x, cancellationToken); } public static Task Aggregate(this IAsyncEnumerable source, Func accumulator, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (accumulator == null) throw new ArgumentNullException(nameof(accumulator)); return Aggregate_(source, accumulator, cancellationToken); } private static async Task Aggregate_(IAsyncEnumerable source, Func accumulator, CancellationToken cancellationToken) { var first = true; var acc = default(TSource); using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { acc = first ? e.Current : accumulator(acc, e.Current); first = false; } } if (first) throw new InvalidOperationException(Strings.NO_ELEMENTS); return acc; } public static Task Count(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(0, (c, _) => checked(c + 1), cancellationToken); } public static Task Count(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return source.Where(predicate).Aggregate(0, (c, _) => checked(c + 1), cancellationToken); } public static Task LongCount(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(0L, (c, _) => checked(c + 1), cancellationToken); } public static Task LongCount(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return source.Where(predicate).Aggregate(0L, (c, _) => checked(c + 1), cancellationToken); } public static Task All(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return All_(source, predicate, cancellationToken); } private static async Task All_(IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { if (!predicate(e.Current)) return false; } } return true; } public static Task Any(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return Any_(source, predicate, cancellationToken); } private static async Task Any_(IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { if (predicate(e.Current)) return true; } } return false; } public static Task Any(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); var e = source.GetEnumerator(); return e.MoveNext(cancellationToken); } public static Task Contains(this IAsyncEnumerable source, TSource value, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return source.Any(x => comparer.Equals(x, value), cancellationToken); } public static Task Contains(this IAsyncEnumerable source, TSource value, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Contains(value, EqualityComparer.Default, cancellationToken); } public static Task First(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return First_(source, cancellationToken); } private static async Task First_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { if (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { return e.Current; } } throw new InvalidOperationException(Strings.NO_ELEMENTS); } public static Task First(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return source.Where(predicate).First(cancellationToken); } public static Task FirstOrDefault(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return FirstOrDefault_(source, cancellationToken); } private static async Task FirstOrDefault_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { if (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { return e.Current; } } return default(TSource); } public static Task FirstOrDefault(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return source.Where(predicate).FirstOrDefault(cancellationToken); } public static Task Last(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Last_(source, cancellationToken); } private static async Task Last_(IAsyncEnumerable source, CancellationToken cancellationToken) { var last = default(TSource); var hasLast = false; using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { hasLast = true; last = e.Current; } } if (!hasLast) throw new InvalidOperationException(Strings.NO_ELEMENTS); return last; } public static Task Last(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return source.Where(predicate).Last(cancellationToken); } public static Task LastOrDefault(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return LastOrDefault_(source, cancellationToken); } private static async Task LastOrDefault_(IAsyncEnumerable source, CancellationToken cancellationToken) { var last = default(TSource); var hasLast = false; using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { hasLast = true; last = e.Current; } } return !hasLast ? default(TSource) : last; } public static Task LastOrDefault(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return source.Where(predicate).LastOrDefault(cancellationToken); } public static Task Single(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Single_(source, cancellationToken); } private static async Task Single_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { if (!await e.MoveNext(cancellationToken).ConfigureAwait(false)) { throw new InvalidOperationException(Strings.NO_ELEMENTS); } var result = e.Current; if (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { throw new InvalidOperationException(Strings.MORE_THAN_ONE_ELEMENT); } return result; } } public static Task Single(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return source.Where(predicate).Single(cancellationToken); } public static Task SingleOrDefault(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return SingleOrDefault_(source, cancellationToken); } private static async Task SingleOrDefault_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { if (!await e.MoveNext(cancellationToken).ConfigureAwait(false)) { return default(TSource); } var result = e.Current; if (!await e.MoveNext(cancellationToken).ConfigureAwait(false)) { return result; } } throw new InvalidOperationException(Strings.MORE_THAN_ONE_ELEMENT); } public static Task SingleOrDefault(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (predicate == null) throw new ArgumentNullException(nameof(predicate)); return source.Where(predicate).SingleOrDefault(cancellationToken); } public static Task ElementAt(this IAsyncEnumerable source, int index, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (index < 0) throw new ArgumentOutOfRangeException(nameof(index)); return ElementAt_(source, index, cancellationToken); } private static async Task ElementAt_(IAsyncEnumerable source, int index, CancellationToken cancellationToken) { if (index >= 0) { using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { if (index == 0) { return e.Current; } index--; } } } throw new ArgumentOutOfRangeException(nameof(index)); } public static Task ElementAtOrDefault(this IAsyncEnumerable source, int index, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (index < 0) throw new ArgumentOutOfRangeException(nameof(index)); return ElementAtOrDefault_(source, index, cancellationToken); } private static async Task ElementAtOrDefault_(IAsyncEnumerable source, int index, CancellationToken cancellationToken) { if (index >= 0) { using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { if (index == 0) { return e.Current; } index--; } } } return default(TSource); } public static Task ToArray(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(new List(), (list, x) => { list.Add(x); return list; }, list => list.ToArray(), cancellationToken); } public static Task> ToList(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(new List(), (list, x) => { list.Add(x); return list; }, cancellationToken); } public static Task> ToDictionary(this IAsyncEnumerable source, Func keySelector, Func elementSelector, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return source.Aggregate(new Dictionary(comparer), (d, x) => { d.Add(keySelector(x), elementSelector(x)); return d; }, cancellationToken); } public static Task> ToDictionary(this IAsyncEnumerable source, Func keySelector, Func elementSelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); return source.ToDictionary(keySelector, elementSelector, EqualityComparer.Default, cancellationToken); } public static Task> ToDictionary(this IAsyncEnumerable source, Func keySelector, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return source.ToDictionary(keySelector, x => x, comparer, cancellationToken); } public static Task> ToDictionary(this IAsyncEnumerable source, Func keySelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); return source.ToDictionary(keySelector, x => x, EqualityComparer.Default, cancellationToken); } public static async Task> ToLookup(this IAsyncEnumerable source, Func keySelector, Func elementSelector, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); var lookup = await Internal.Lookup.CreateAsync(source, keySelector, elementSelector, comparer, cancellationToken).ConfigureAwait(false); return lookup; } public static Task> ToLookup(this IAsyncEnumerable source, Func keySelector, Func elementSelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); return source.ToLookup(keySelector, elementSelector, EqualityComparer.Default, cancellationToken); } public static Task> ToLookup(this IAsyncEnumerable source, Func keySelector, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return source.ToLookup(keySelector, x => x, comparer, cancellationToken); } public static Task> ToLookup(this IAsyncEnumerable source, Func keySelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); return source.ToLookup(keySelector, x => x, EqualityComparer.Default, cancellationToken); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(this IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { if (!await e.MoveNext(cancellationToken).ConfigureAwait(false)) { throw new InvalidOperationException(Strings.NO_ELEMENTS); } long sum = e.Current; long count = 1; checked { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { sum += e.Current; ++count; } } return (double)sum / count; } } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { var v = e.Current; if (v.HasValue) { long sum = v.GetValueOrDefault(); long count = 1; checked { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { v = e.Current; if (v.HasValue) { sum += v.GetValueOrDefault(); ++count; } } } return (double)sum / count; } } } return null; } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { if (!await e.MoveNext(cancellationToken).ConfigureAwait(false)) { throw new InvalidOperationException(Strings.NO_ELEMENTS); } var sum = e.Current; long count = 1; checked { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { sum += e.Current; ++count; } } return (double)sum / count; } } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { var v = e.Current; if (v.HasValue) { var sum = v.GetValueOrDefault(); long count = 1; checked { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { v = e.Current; if (v.HasValue) { sum += v.GetValueOrDefault(); ++count; } } } return (double)sum / count; } } } return null; } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { if (!await e.MoveNext(cancellationToken).ConfigureAwait(false)) { throw new InvalidOperationException(Strings.NO_ELEMENTS); } var sum = e.Current; long count = 1; while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { // There is an opportunity to short-circuit here, in that if e.Current is // ever NaN then the result will always be NaN. Assuming that this case is // rare enough that not checking is the better approach generally. sum += e.Current; ++count; } return sum / count; } } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { var v = e.Current; if (v.HasValue) { var sum = v.GetValueOrDefault(); long count = 1; checked { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { v = e.Current; if (v.HasValue) { sum += v.GetValueOrDefault(); ++count; } } } return sum / count; } } } return null; } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { if (!await e.MoveNext(cancellationToken).ConfigureAwait(false)) { throw new InvalidOperationException(Strings.NO_ELEMENTS); } double sum = e.Current; long count = 1; while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { sum += e.Current; ++count; } return (float)(sum / count); } } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { var v = e.Current; if (v.HasValue) { double sum = v.GetValueOrDefault(); long count = 1; checked { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { v = e.Current; if (v.HasValue) { sum += v.GetValueOrDefault(); ++count; } } } return (float)(sum / count); } } } return null; } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { if (!await e.MoveNext(cancellationToken).ConfigureAwait(false)) { throw new InvalidOperationException(Strings.NO_ELEMENTS); } var sum = e.Current; long count = 1; while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { sum += e.Current; ++count; } return sum / count; } } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return Average_(source, cancellationToken); } private static async Task Average_(IAsyncEnumerable source, CancellationToken cancellationToken) { using (var e = source.GetEnumerator()) { while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { var v = e.Current; if (v.HasValue) { var sum = v.GetValueOrDefault(); long count = 1; while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { v = e.Current; if (v.HasValue) { sum += v.GetValueOrDefault(); ++count; } } return sum / count; } } } return null; } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Average(cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Max, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Max, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Max, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Max, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Max, cancellationToken); } static T? NullableMax(T? x, T? y) where T : struct, IComparable { if (!x.HasValue) return y; if (!y.HasValue) return x; if (x.Value.CompareTo(y.Value) >= 0) return x; return y; } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(int?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(long?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(double?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(float?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(decimal?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); var comparer = Comparer.Default; return source.Aggregate((x, y) => comparer.Compare(x, y) >= 0 ? x : y, cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Max(cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Min, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Min, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Min, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Min, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(Math.Min, cancellationToken); } static T? NullableMin(T? x, T? y) where T : struct, IComparable { if (!x.HasValue) return y; if (!y.HasValue) return x; if (x.Value.CompareTo(y.Value) <= 0) return x; return y; } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(int?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(long?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(double?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(float?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(default(decimal?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); var comparer = Comparer.Default; return source.Aggregate((x, y) => comparer.Compare(x, y) <= 0 ? x : y, cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Min(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(0, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(0L, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(0.0, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(0f, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate(0m, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate((int?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate((long?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate((double?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate((float?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return source.Aggregate((decimal?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.Select(selector).Sum(cancellationToken); } public static Task IsEmpty(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); return IsEmpty_(source, cancellationToken); } private static async Task IsEmpty_(IAsyncEnumerable source, CancellationToken cancellationToken) { return !await source.Any(cancellationToken).ConfigureAwait(false); } public static Task Min(this IAsyncEnumerable source, IComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Min_(source, comparer, cancellationToken); } private static async Task Min_(IAsyncEnumerable source, IComparer comparer, CancellationToken cancellationToken) { return (await MinBy(source, x => x, comparer, cancellationToken).ConfigureAwait(false)).First(); } public static Task> MinBy(this IAsyncEnumerable source, Func keySelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); return MinBy(source, keySelector, Comparer.Default, cancellationToken); } public static Task> MinBy(this IAsyncEnumerable source, Func keySelector, IComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return ExtremaBy(source, keySelector, (key, minValue) => -comparer.Compare(key, minValue), cancellationToken); } public static Task Max(this IAsyncEnumerable source, IComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Max_(source, comparer, cancellationToken); } private static async Task Max_(IAsyncEnumerable source, IComparer comparer, CancellationToken cancellationToken) { return (await MaxBy(source, x => x, comparer, cancellationToken).ConfigureAwait(false)).First(); } public static Task> MaxBy(this IAsyncEnumerable source, Func keySelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); return MaxBy(source, keySelector, Comparer.Default, cancellationToken); } public static Task> MaxBy(this IAsyncEnumerable source, Func keySelector, IComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return ExtremaBy(source, keySelector, (key, minValue) => comparer.Compare(key, minValue), cancellationToken); } private static async Task> ExtremaBy(IAsyncEnumerable source, Func keySelector, Func compare, CancellationToken cancellationToken) { var result = new List(); using (var e = source.GetEnumerator()) { if (!await e.MoveNext(cancellationToken).ConfigureAwait(false)) throw new InvalidOperationException(Strings.NO_ELEMENTS); var current = e.Current; var resKey = keySelector(current); result.Add(current); while (await e.MoveNext(cancellationToken).ConfigureAwait(false)) { var cur = e.Current; var key = keySelector(cur); var cmp = compare(key, resKey); if (cmp == 0) { result.Add(cur); } else if (cmp > 0) { result = new List { cur }; resKey = key; } } } return result; } } }