|
@@ -17,7 +17,33 @@ namespace System.Linq
|
|
|
if (accumulator == null)
|
|
|
throw Error.ArgumentNull(nameof(accumulator));
|
|
|
|
|
|
- return AggregateCore(source, accumulator, cancellationToken);
|
|
|
+ return Core();
|
|
|
+
|
|
|
+ async Task<TSource> Core()
|
|
|
+ {
|
|
|
+ var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (!await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ throw Error.NoElements();
|
|
|
+ }
|
|
|
+
|
|
|
+ var acc = e.Current;
|
|
|
+
|
|
|
+ while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ acc = accumulator(acc, e.Current);
|
|
|
+ }
|
|
|
+
|
|
|
+ return acc;
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken = default)
|
|
@@ -27,7 +53,33 @@ namespace System.Linq
|
|
|
if (accumulator == null)
|
|
|
throw Error.ArgumentNull(nameof(accumulator));
|
|
|
|
|
|
- return AggregateCore(source, accumulator, cancellationToken);
|
|
|
+ return Core();
|
|
|
+
|
|
|
+ async Task<TSource> Core()
|
|
|
+ {
|
|
|
+ var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (!await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ throw Error.NoElements();
|
|
|
+ }
|
|
|
+
|
|
|
+ var acc = e.Current;
|
|
|
+
|
|
|
+ while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ acc = await accumulator(acc, e.Current).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ return acc;
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
#if !NO_DEEP_CANCELLATION
|
|
@@ -38,7 +90,33 @@ namespace System.Linq
|
|
|
if (accumulator == null)
|
|
|
throw Error.ArgumentNull(nameof(accumulator));
|
|
|
|
|
|
- return AggregateCore(source, accumulator, cancellationToken);
|
|
|
+ return Core();
|
|
|
+
|
|
|
+ async Task<TSource> Core()
|
|
|
+ {
|
|
|
+ var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (!await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ throw Error.NoElements();
|
|
|
+ }
|
|
|
+
|
|
|
+ var acc = e.Current;
|
|
|
+
|
|
|
+ while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ return acc;
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
#endif
|
|
|
|
|
@@ -49,254 +127,191 @@ namespace System.Linq
|
|
|
if (accumulator == null)
|
|
|
throw Error.ArgumentNull(nameof(accumulator));
|
|
|
|
|
|
- return AggregateCore(source, seed, accumulator, x => x, cancellationToken);
|
|
|
- }
|
|
|
+ return Core();
|
|
|
|
|
|
- public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
|
|
|
- {
|
|
|
- if (source == null)
|
|
|
- throw Error.ArgumentNull(nameof(source));
|
|
|
- if (accumulator == null)
|
|
|
- throw Error.ArgumentNull(nameof(accumulator));
|
|
|
+ async Task<TAccumulate> Core()
|
|
|
+ {
|
|
|
+ var acc = seed;
|
|
|
|
|
|
- return AggregateCore(source, seed, accumulator, cancellationToken);
|
|
|
- }
|
|
|
+ var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
|
|
|
-#if !NO_DEEP_CANCELLATION
|
|
|
- public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
|
|
|
- {
|
|
|
- if (source == null)
|
|
|
- throw Error.ArgumentNull(nameof(source));
|
|
|
- if (accumulator == null)
|
|
|
- throw Error.ArgumentNull(nameof(accumulator));
|
|
|
+ try
|
|
|
+ {
|
|
|
+ while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ acc = accumulator(acc, e.Current);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
|
|
|
- return AggregateCore(source, seed, accumulator, cancellationToken);
|
|
|
+ return acc;
|
|
|
+ }
|
|
|
}
|
|
|
-#endif
|
|
|
|
|
|
- public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default)
|
|
|
+ public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
|
|
|
{
|
|
|
if (source == null)
|
|
|
throw Error.ArgumentNull(nameof(source));
|
|
|
if (accumulator == null)
|
|
|
throw Error.ArgumentNull(nameof(accumulator));
|
|
|
- if (resultSelector == null)
|
|
|
- throw Error.ArgumentNull(nameof(resultSelector));
|
|
|
|
|
|
- return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
|
|
|
- }
|
|
|
+ return Core();
|
|
|
|
|
|
- public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
|
|
|
- {
|
|
|
- if (source == null)
|
|
|
- throw Error.ArgumentNull(nameof(source));
|
|
|
- if (accumulator == null)
|
|
|
- throw Error.ArgumentNull(nameof(accumulator));
|
|
|
- if (resultSelector == null)
|
|
|
- throw Error.ArgumentNull(nameof(resultSelector));
|
|
|
+ async Task<TAccumulate> Core()
|
|
|
+ {
|
|
|
+ var acc = seed;
|
|
|
+
|
|
|
+ var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ acc = await accumulator(acc, e.Current).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
|
|
|
- return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
|
|
|
+ return acc;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
#if !NO_DEEP_CANCELLATION
|
|
|
- public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
|
|
|
+ public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
|
|
|
{
|
|
|
if (source == null)
|
|
|
throw Error.ArgumentNull(nameof(source));
|
|
|
if (accumulator == null)
|
|
|
throw Error.ArgumentNull(nameof(accumulator));
|
|
|
- if (resultSelector == null)
|
|
|
- throw Error.ArgumentNull(nameof(resultSelector));
|
|
|
-
|
|
|
- return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
|
|
|
- }
|
|
|
-#endif
|
|
|
-
|
|
|
- private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
|
|
|
- {
|
|
|
- var acc = seed;
|
|
|
|
|
|
- var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+ return Core();
|
|
|
|
|
|
- try
|
|
|
- {
|
|
|
- while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
- {
|
|
|
- acc = accumulator(acc, e.Current);
|
|
|
- }
|
|
|
- }
|
|
|
- finally
|
|
|
+ async Task<TAccumulate> Core()
|
|
|
{
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
+ var acc = seed;
|
|
|
|
|
|
- return resultSelector(acc);
|
|
|
- }
|
|
|
-
|
|
|
- private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
|
|
|
- {
|
|
|
- var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+ var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
|
|
|
- try
|
|
|
- {
|
|
|
- if (!await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ try
|
|
|
{
|
|
|
- throw Error.NoElements();
|
|
|
+ while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- var acc = e.Current;
|
|
|
-
|
|
|
- while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ finally
|
|
|
{
|
|
|
- acc = accumulator(acc, e.Current);
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
}
|
|
|
|
|
|
return acc;
|
|
|
}
|
|
|
- finally
|
|
|
- {
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
}
|
|
|
+#endif
|
|
|
|
|
|
- private static async Task<TResult> AggregateCore<TSource, TResult>(IAsyncEnumerable<TSource> source, TResult seed, Func<TResult, TSource, ValueTask<TResult>> accumulator, CancellationToken cancellationToken)
|
|
|
+ public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default)
|
|
|
{
|
|
|
- var acc = seed;
|
|
|
+ if (source == null)
|
|
|
+ throw Error.ArgumentNull(nameof(source));
|
|
|
+ if (accumulator == null)
|
|
|
+ throw Error.ArgumentNull(nameof(accumulator));
|
|
|
+ if (resultSelector == null)
|
|
|
+ throw Error.ArgumentNull(nameof(resultSelector));
|
|
|
|
|
|
- var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+ return Core();
|
|
|
|
|
|
- try
|
|
|
+ async Task<TResult> Core()
|
|
|
{
|
|
|
- while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
- {
|
|
|
- acc = await accumulator(acc, e.Current).ConfigureAwait(false);
|
|
|
- }
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
+ var acc = seed;
|
|
|
|
|
|
- return acc;
|
|
|
- }
|
|
|
+ var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
|
|
|
-#if !NO_DEEP_CANCELLATION
|
|
|
- private static async Task<TResult> AggregateCore<TSource, TResult>(IAsyncEnumerable<TSource> source, TResult seed, Func<TResult, TSource, CancellationToken, ValueTask<TResult>> accumulator, CancellationToken cancellationToken)
|
|
|
- {
|
|
|
- var acc = seed;
|
|
|
-
|
|
|
- var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
-
|
|
|
- try
|
|
|
- {
|
|
|
- while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ try
|
|
|
{
|
|
|
- acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
|
|
|
+ while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ acc = accumulator(acc, e.Current);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
-
|
|
|
- return acc;
|
|
|
- }
|
|
|
-#endif
|
|
|
-
|
|
|
- private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
|
|
|
- {
|
|
|
- var acc = seed;
|
|
|
-
|
|
|
- var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
-
|
|
|
- try
|
|
|
- {
|
|
|
- while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ finally
|
|
|
{
|
|
|
- acc = await accumulator(acc, e.Current).ConfigureAwait(false);
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
}
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
|
|
|
- return await resultSelector(acc).ConfigureAwait(false);
|
|
|
+ return resultSelector(acc);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-#if !NO_DEEP_CANCELLATION
|
|
|
- private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
|
|
|
+ public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
|
|
|
{
|
|
|
- var acc = seed;
|
|
|
+ if (source == null)
|
|
|
+ throw Error.ArgumentNull(nameof(source));
|
|
|
+ if (accumulator == null)
|
|
|
+ throw Error.ArgumentNull(nameof(accumulator));
|
|
|
+ if (resultSelector == null)
|
|
|
+ throw Error.ArgumentNull(nameof(resultSelector));
|
|
|
|
|
|
- var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+ return Core();
|
|
|
|
|
|
- try
|
|
|
- {
|
|
|
- while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
- {
|
|
|
- acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
|
|
|
- }
|
|
|
- }
|
|
|
- finally
|
|
|
+ async Task<TResult> Core()
|
|
|
{
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
+ var acc = seed;
|
|
|
|
|
|
- return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
|
|
|
- }
|
|
|
-#endif
|
|
|
-
|
|
|
- private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
|
|
|
- {
|
|
|
- var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+ var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
|
|
|
- try
|
|
|
- {
|
|
|
- if (!await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ try
|
|
|
{
|
|
|
- throw Error.NoElements();
|
|
|
+ while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ acc = await accumulator(acc, e.Current).ConfigureAwait(false);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- var acc = e.Current;
|
|
|
-
|
|
|
- while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ finally
|
|
|
{
|
|
|
- acc = await accumulator(acc, e.Current).ConfigureAwait(false);
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
}
|
|
|
|
|
|
- return acc;
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ return await resultSelector(acc).ConfigureAwait(false);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#if !NO_DEEP_CANCELLATION
|
|
|
- private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
|
|
|
+ public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
|
|
|
{
|
|
|
- var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
+ if (source == null)
|
|
|
+ throw Error.ArgumentNull(nameof(source));
|
|
|
+ if (accumulator == null)
|
|
|
+ throw Error.ArgumentNull(nameof(accumulator));
|
|
|
+ if (resultSelector == null)
|
|
|
+ throw Error.ArgumentNull(nameof(resultSelector));
|
|
|
+
|
|
|
+ return Core();
|
|
|
|
|
|
- try
|
|
|
+ async Task<TResult> Core()
|
|
|
{
|
|
|
- if (!await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
- {
|
|
|
- throw Error.NoElements();
|
|
|
- }
|
|
|
+ var acc = seed;
|
|
|
|
|
|
- var acc = e.Current;
|
|
|
+ var e = source.GetAsyncEnumerator(cancellationToken);
|
|
|
|
|
|
- while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ try
|
|
|
{
|
|
|
- acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
|
|
|
+ while (await e.MoveNextAsync().ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ await e.DisposeAsync().ConfigureAwait(false);
|
|
|
}
|
|
|
|
|
|
- return acc;
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- await e.DisposeAsync().ConfigureAwait(false);
|
|
|
+ return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
|
|
|
}
|
|
|
}
|
|
|
#endif
|