// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace System.Linq
{
public static partial class AsyncEnumerable
{
///
/// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence.
/// For aggregation behavior with incremental intermediate results, see System.Interactive.Async.AsyncEnumerableEx.Scan{TSource}.
///
/// The type of the elements in the source sequence and the result of the aggregation.
/// An async-enumerable sequence to aggregate over.
/// An accumulator function to be invoked on each element.
/// The optional cancellation token to be used for cancelling the sequence at any time.
/// An async-enumerable sequence containing a single element with the final accumulator value.
/// or is null.
/// (Asynchronous) The source sequence is empty.
/// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.
public static ValueTask AggregateAsync(this IAsyncEnumerable source, Func accumulator, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
return Core(source, accumulator, cancellationToken);
static async ValueTask Core(IAsyncEnumerable source, Func accumulator, CancellationToken cancellationToken)
{
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
if (!await e.MoveNextAsync())
{
throw Error.NoElements();
}
var acc = e.Current;
while (await e.MoveNextAsync())
{
acc = accumulator(acc, e.Current);
}
return acc;
}
}
///
/// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence.
///
/// The type of elements in the source sequence.
/// An async-enumerable sequence to aggregate over.
/// An asynchronous accumulator function to be invoked and awaited on each element.
/// An optional cancellation token to be used for cancelling the sequence at any time.
/// A ValueTask containing the final accumulator value.
/// or is .
/// The source sequence is empty.
/// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.
[GenerateAsyncOverload]
private static ValueTask AggregateAwaitAsyncCore(this IAsyncEnumerable source, Func> accumulator, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
return Core(source, accumulator, cancellationToken);
static async ValueTask Core(IAsyncEnumerable source, Func> accumulator, CancellationToken cancellationToken)
{
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
if (!await e.MoveNextAsync())
{
throw Error.NoElements();
}
var acc = e.Current;
while (await e.MoveNextAsync())
{
acc = await accumulator(acc, e.Current).ConfigureAwait(false);
}
return acc;
}
}
#if !NO_DEEP_CANCELLATION
[GenerateAsyncOverload]
private static ValueTask AggregateAwaitWithCancellationAsyncCore(this IAsyncEnumerable source, Func> accumulator, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
return Core(source, accumulator, cancellationToken);
static async ValueTask Core(IAsyncEnumerable source, Func> accumulator, CancellationToken cancellationToken)
{
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
if (!await e.MoveNextAsync())
{
throw Error.NoElements();
}
var acc = e.Current;
while (await e.MoveNextAsync())
{
acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
}
return acc;
}
}
#endif
///
/// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
/// For aggregation behavior with incremental intermediate results, see System.Interactive.Async.AsyncEnumerableEx.Scan{TSource, Accumulate}".
///
/// The type of the elements in the source sequence.
/// The type of the result of the aggregation.
/// An async-enumerable sequence to aggregate over.
/// The initial accumulator value.
/// An accumulator function to be invoked on each element.
/// The optional cancellation token to be used for cancelling the sequence at any time.
/// An async-enumerable sequence containing a single element with the final accumulator value.
/// or is null.
/// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.
public static ValueTask AggregateAsync(this IAsyncEnumerable source, TAccumulate seed, Func accumulator, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
return Core(source, seed, accumulator, cancellationToken);
static async ValueTask Core(IAsyncEnumerable source, TAccumulate seed, Func accumulator, CancellationToken cancellationToken)
{
var acc = seed;
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
acc = accumulator(acc, item);
}
return acc;
}
}
///
/// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
///
/// The type of elements in the source sequence.
/// The type of the result of aggregation.
/// An async-enumerable sequence to aggregate over.
/// The initial accumulator value.
/// An asynchronous accumulator function to be invoked and awaited on each element.
/// An optional cancellation token to be used for cancelling the sequence at any time.
/// A ValueTask containing the final accumulator value.
/// or is .
/// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.
[GenerateAsyncOverload]
private static ValueTask AggregateAwaitAsyncCore(this IAsyncEnumerable source, TAccumulate seed, Func> accumulator, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
return Core(source, seed, accumulator, cancellationToken);
static async ValueTask Core(IAsyncEnumerable source, TAccumulate seed, Func> accumulator, CancellationToken cancellationToken)
{
var acc = seed;
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
acc = await accumulator(acc, item).ConfigureAwait(false);
}
return acc;
}
}
#if !NO_DEEP_CANCELLATION
[GenerateAsyncOverload]
private static ValueTask AggregateAwaitWithCancellationAsyncCore(this IAsyncEnumerable source, TAccumulate seed, Func> accumulator, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
return Core(source, seed, accumulator, cancellationToken);
static async ValueTask Core(IAsyncEnumerable source, TAccumulate seed, Func> accumulator, CancellationToken cancellationToken)
{
var acc = seed;
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
}
return acc;
}
}
#endif
///
/// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value,
/// and the specified result selector function is used to select the result value.
///
/// The type of the elements in the source sequence.
/// The type of the accumulator value.
/// The type of the resulting value.
/// An async-enumerable sequence to aggregate over.
/// The initial accumulator value.
/// An accumulator function to be invoked on each element.
/// A function to transform the final accumulator value into the result value.
/// The optional cancellation token to be used for cancelling the sequence at any time.
/// An async-enumerable sequence containing a single element with the final accumulator value.
/// or or is null.
/// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.
public static ValueTask AggregateAsync(this IAsyncEnumerable source, TAccumulate seed, Func accumulator, Func resultSelector, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
if (resultSelector == null)
throw Error.ArgumentNull(nameof(resultSelector));
return Core(source, seed, accumulator, resultSelector, cancellationToken);
static async ValueTask Core(IAsyncEnumerable source, TAccumulate seed, Func accumulator, Func resultSelector, CancellationToken cancellationToken)
{
var acc = seed;
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
acc = accumulator(acc, item);
}
return resultSelector(acc);
}
}
///
/// Applies an accumulator function over an async-enumerable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value,
/// and the specified result selector is used to select the result value.
///
/// The type of elements in the source sequence.
/// The type of the accumulator value.
/// The type of the resulting value.
/// An async-enumerable sequence to aggregate over.
/// The initial accumulator value.
/// An asynchronous accumulator function to be invoked and awaited on each element.
/// An asynchronous transform function to transform the final accumulator value into the result value.
/// An optional cancellation token to be used for cancelling the sequence at any time.
/// A ValueTask containing the value obtained by applying the result selector to the final accumulator value.
/// or or is .
[GenerateAsyncOverload]
private static ValueTask AggregateAwaitAsyncCore(this IAsyncEnumerable source, TAccumulate seed, Func> accumulator, Func> resultSelector, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
if (resultSelector == null)
throw Error.ArgumentNull(nameof(resultSelector));
return Core(source, seed, accumulator, resultSelector, cancellationToken);
static async ValueTask Core(IAsyncEnumerable source, TAccumulate seed, Func> accumulator, Func> resultSelector, CancellationToken cancellationToken)
{
var acc = seed;
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
acc = await accumulator(acc, item).ConfigureAwait(false);
}
return await resultSelector(acc).ConfigureAwait(false);
}
}
#if !NO_DEEP_CANCELLATION
[GenerateAsyncOverload]
private static ValueTask AggregateAwaitWithCancellationAsyncCore(this IAsyncEnumerable source, TAccumulate seed, Func> accumulator, Func> resultSelector, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
if (resultSelector == null)
throw Error.ArgumentNull(nameof(resultSelector));
return Core(source, seed, accumulator, resultSelector, cancellationToken);
static async ValueTask Core(IAsyncEnumerable source, TAccumulate seed, Func> accumulator, Func> resultSelector, CancellationToken cancellationToken)
{
var acc = seed;
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
acc = await accumulator(acc, item, cancellationToken).ConfigureAwait(false);
}
return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
}
}
#endif
}
}