Parcourir la source

Async variants of Aggregate.

Bart De Smet il y a 8 ans
Parent
commit
01c87be60b

+ 111 - 0
Ix.NET/Source/System.Interactive.Async/Aggregate.cs

@@ -30,6 +30,26 @@ namespace System.Linq
             return Aggregate_(source, accumulator, cancellationToken);
         }
 
+        public static Task<TSource> Aggregate<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, Task<TSource>> accumulator)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (accumulator == null)
+                throw new ArgumentNullException(nameof(accumulator));
+
+            return Aggregate(source, accumulator, CancellationToken.None);
+        }
+
+        public static Task<TSource> Aggregate<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, Task<TSource>> accumulator, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (accumulator == null)
+                throw new ArgumentNullException(nameof(accumulator));
+
+            return Aggregate_(source, accumulator, cancellationToken);
+        }
+
         public static Task<TAccumulate> Aggregate<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
         {
             if (source == null)
@@ -50,6 +70,26 @@ namespace System.Linq
             return source.Aggregate(seed, accumulator, x => x, cancellationToken);
         }
 
+        public static Task<TAccumulate> Aggregate<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (accumulator == null)
+                throw new ArgumentNullException(nameof(accumulator));
+
+            return Aggregate(source, seed, accumulator, CancellationToken.None);
+        }
+
+        public static Task<TAccumulate> Aggregate<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (accumulator == null)
+                throw new ArgumentNullException(nameof(accumulator));
+
+            return source.Aggregate(seed, accumulator, x => Task.FromResult(x), cancellationToken);
+        }
+
         public static Task<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
         {
             if (source == null)
@@ -74,6 +114,30 @@ namespace System.Linq
             return Aggregate_(source, seed, accumulator, resultSelector, cancellationToken);
         }
 
+        public static Task<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator, Func<TAccumulate, Task<TResult>> resultSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (accumulator == null)
+                throw new ArgumentNullException(nameof(accumulator));
+            if (resultSelector == null)
+                throw new ArgumentNullException(nameof(resultSelector));
+
+            return Aggregate(source, seed, accumulator, resultSelector, CancellationToken.None);
+        }
+
+        public static Task<TResult> Aggregate<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator, Func<TAccumulate, Task<TResult>> resultSelector, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (accumulator == null)
+                throw new ArgumentNullException(nameof(accumulator));
+            if (resultSelector == null)
+                throw new ArgumentNullException(nameof(resultSelector));
+
+            return Aggregate_(source, seed, accumulator, resultSelector, cancellationToken);
+        }
+
         private static async Task<TResult> Aggregate_<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
         {
             var acc = seed;
@@ -120,5 +184,52 @@ namespace System.Linq
 
             return acc;
         }
+
+        private static async Task<TResult> Aggregate_<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator, Func<TAccumulate, Task<TResult>> resultSelector, CancellationToken cancellationToken)
+        {
+            var acc = seed;
+
+            var e = source.GetAsyncEnumerator();
+
+            try
+            {
+                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                {
+                    acc = await accumulator(acc, e.Current).ConfigureAwait(false);
+                }
+            }
+            finally
+            {
+                await e.DisposeAsync().ConfigureAwait(false);
+            }
+
+            return await resultSelector(acc).ConfigureAwait(false);
+        }
+
+        private static async Task<TSource> Aggregate_<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, Task<TSource>> accumulator, CancellationToken cancellationToken)
+        {
+            var first = true;
+            var acc = default(TSource);
+
+            var e = source.GetAsyncEnumerator();
+
+            try
+            {
+                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                {
+                    acc = first ? e.Current : await accumulator(acc, e.Current).ConfigureAwait(false);
+                    first = false;
+                }
+            }
+            finally
+            {
+                await e.DisposeAsync().ConfigureAwait(false);
+            }
+
+            if (first)
+                throw new InvalidOperationException(Strings.NO_ELEMENTS);
+
+            return acc;
+        }
     }
 }

+ 40 - 40
Ix.NET/Source/System.Interactive.Async/MinMax.Generated.cs

@@ -15,7 +15,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, CancellationToken.None);
+            return source.Aggregate(new Func<int, int, int>(Math.Max), CancellationToken.None);
         }
 
         public static Task<int> Max(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
@@ -23,7 +23,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, cancellationToken);
+            return source.Aggregate(new Func<int, int, int>(Math.Max), cancellationToken);
         }
 
         public static Task<int> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector)
@@ -71,7 +71,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, CancellationToken.None);
+            return source.Aggregate(new Func<long, long, long>(Math.Max), CancellationToken.None);
         }
 
         public static Task<long> Max(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
@@ -79,7 +79,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, cancellationToken);
+            return source.Aggregate(new Func<long, long, long>(Math.Max), cancellationToken);
         }
 
         public static Task<long> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector)
@@ -127,7 +127,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, CancellationToken.None);
+            return source.Aggregate(new Func<float, float, float>(Math.Max), CancellationToken.None);
         }
 
         public static Task<float> Max(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
@@ -135,7 +135,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, cancellationToken);
+            return source.Aggregate(new Func<float, float, float>(Math.Max), cancellationToken);
         }
 
         public static Task<float> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector)
@@ -183,7 +183,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, CancellationToken.None);
+            return source.Aggregate(new Func<double, double, double>(Math.Max), CancellationToken.None);
         }
 
         public static Task<double> Max(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
@@ -191,7 +191,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, cancellationToken);
+            return source.Aggregate(new Func<double, double, double>(Math.Max), cancellationToken);
         }
 
         public static Task<double> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector)
@@ -239,7 +239,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, CancellationToken.None);
+            return source.Aggregate(new Func<decimal, decimal, decimal>(Math.Max), CancellationToken.None);
         }
 
         public static Task<decimal> Max(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
@@ -247,7 +247,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Max, cancellationToken);
+            return source.Aggregate(new Func<decimal, decimal, decimal>(Math.Max), cancellationToken);
         }
 
         public static Task<decimal> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector)
@@ -295,7 +295,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(int?), NullableMax, CancellationToken.None);
+            return source.Aggregate(default(int?), new Func<int?, int?, int?>(NullableMax), CancellationToken.None);
         }
 
         public static Task<int?> Max(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
@@ -303,7 +303,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(int?), NullableMax, cancellationToken);
+            return source.Aggregate(default(int?), new Func<int?, int?, int?>(NullableMax), cancellationToken);
         }
 
         public static Task<int?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector)
@@ -351,7 +351,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(long?), NullableMax, CancellationToken.None);
+            return source.Aggregate(default(long?), new Func<long?, long?, long?>(NullableMax), CancellationToken.None);
         }
 
         public static Task<long?> Max(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
@@ -359,7 +359,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(long?), NullableMax, cancellationToken);
+            return source.Aggregate(default(long?), new Func<long?, long?, long?>(NullableMax), cancellationToken);
         }
 
         public static Task<long?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector)
@@ -407,7 +407,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(float?), NullableMax, CancellationToken.None);
+            return source.Aggregate(default(float?), new Func<float?, float?, float?>(NullableMax), CancellationToken.None);
         }
 
         public static Task<float?> Max(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
@@ -415,7 +415,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(float?), NullableMax, cancellationToken);
+            return source.Aggregate(default(float?), new Func<float?, float?, float?>(NullableMax), cancellationToken);
         }
 
         public static Task<float?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector)
@@ -463,7 +463,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(double?), NullableMax, CancellationToken.None);
+            return source.Aggregate(default(double?), new Func<double?, double?, double?>(NullableMax), CancellationToken.None);
         }
 
         public static Task<double?> Max(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
@@ -471,7 +471,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(double?), NullableMax, cancellationToken);
+            return source.Aggregate(default(double?), new Func<double?, double?, double?>(NullableMax), cancellationToken);
         }
 
         public static Task<double?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector)
@@ -519,7 +519,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(decimal?), NullableMax, CancellationToken.None);
+            return source.Aggregate(default(decimal?), new Func<decimal?, decimal?, decimal?>(NullableMax), CancellationToken.None);
         }
 
         public static Task<decimal?> Max(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
@@ -527,7 +527,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(decimal?), NullableMax, cancellationToken);
+            return source.Aggregate(default(decimal?), new Func<decimal?, decimal?, decimal?>(NullableMax), cancellationToken);
         }
 
         public static Task<decimal?> Max<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector)
@@ -575,7 +575,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, CancellationToken.None);
+            return source.Aggregate(new Func<int, int, int>(Math.Min), CancellationToken.None);
         }
 
         public static Task<int> Min(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
@@ -583,7 +583,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, cancellationToken);
+            return source.Aggregate(new Func<int, int, int>(Math.Min), cancellationToken);
         }
 
         public static Task<int> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int> selector)
@@ -631,7 +631,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, CancellationToken.None);
+            return source.Aggregate(new Func<long, long, long>(Math.Min), CancellationToken.None);
         }
 
         public static Task<long> Min(this IAsyncEnumerable<long> source, CancellationToken cancellationToken)
@@ -639,7 +639,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, cancellationToken);
+            return source.Aggregate(new Func<long, long, long>(Math.Min), cancellationToken);
         }
 
         public static Task<long> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long> selector)
@@ -687,7 +687,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, CancellationToken.None);
+            return source.Aggregate(new Func<float, float, float>(Math.Min), CancellationToken.None);
         }
 
         public static Task<float> Min(this IAsyncEnumerable<float> source, CancellationToken cancellationToken)
@@ -695,7 +695,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, cancellationToken);
+            return source.Aggregate(new Func<float, float, float>(Math.Min), cancellationToken);
         }
 
         public static Task<float> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float> selector)
@@ -743,7 +743,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, CancellationToken.None);
+            return source.Aggregate(new Func<double, double, double>(Math.Min), CancellationToken.None);
         }
 
         public static Task<double> Min(this IAsyncEnumerable<double> source, CancellationToken cancellationToken)
@@ -751,7 +751,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, cancellationToken);
+            return source.Aggregate(new Func<double, double, double>(Math.Min), cancellationToken);
         }
 
         public static Task<double> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double> selector)
@@ -799,7 +799,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, CancellationToken.None);
+            return source.Aggregate(new Func<decimal, decimal, decimal>(Math.Min), CancellationToken.None);
         }
 
         public static Task<decimal> Min(this IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
@@ -807,7 +807,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(Math.Min, cancellationToken);
+            return source.Aggregate(new Func<decimal, decimal, decimal>(Math.Min), cancellationToken);
         }
 
         public static Task<decimal> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal> selector)
@@ -855,7 +855,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(int?), NullableMin, CancellationToken.None);
+            return source.Aggregate(default(int?), new Func<int?, int?, int?>(NullableMin), CancellationToken.None);
         }
 
         public static Task<int?> Min(this IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
@@ -863,7 +863,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(int?), NullableMin, cancellationToken);
+            return source.Aggregate(default(int?), new Func<int?, int?, int?>(NullableMin), cancellationToken);
         }
 
         public static Task<int?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int?> selector)
@@ -911,7 +911,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(long?), NullableMin, CancellationToken.None);
+            return source.Aggregate(default(long?), new Func<long?, long?, long?>(NullableMin), CancellationToken.None);
         }
 
         public static Task<long?> Min(this IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
@@ -919,7 +919,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(long?), NullableMin, cancellationToken);
+            return source.Aggregate(default(long?), new Func<long?, long?, long?>(NullableMin), cancellationToken);
         }
 
         public static Task<long?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, long?> selector)
@@ -967,7 +967,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(float?), NullableMin, CancellationToken.None);
+            return source.Aggregate(default(float?), new Func<float?, float?, float?>(NullableMin), CancellationToken.None);
         }
 
         public static Task<float?> Min(this IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
@@ -975,7 +975,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(float?), NullableMin, cancellationToken);
+            return source.Aggregate(default(float?), new Func<float?, float?, float?>(NullableMin), cancellationToken);
         }
 
         public static Task<float?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, float?> selector)
@@ -1023,7 +1023,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(double?), NullableMin, CancellationToken.None);
+            return source.Aggregate(default(double?), new Func<double?, double?, double?>(NullableMin), CancellationToken.None);
         }
 
         public static Task<double?> Min(this IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
@@ -1031,7 +1031,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(double?), NullableMin, cancellationToken);
+            return source.Aggregate(default(double?), new Func<double?, double?, double?>(NullableMin), cancellationToken);
         }
 
         public static Task<double?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, double?> selector)
@@ -1079,7 +1079,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(decimal?), NullableMin, CancellationToken.None);
+            return source.Aggregate(default(decimal?), new Func<decimal?, decimal?, decimal?>(NullableMin), CancellationToken.None);
         }
 
         public static Task<decimal?> Min(this IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
@@ -1087,7 +1087,7 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return source.Aggregate(default(decimal?), NullableMin, cancellationToken);
+            return source.Aggregate(default(decimal?), new Func<decimal?, decimal?, decimal?>(NullableMin), cancellationToken);
         }
 
         public static Task<decimal?> Min<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, decimal?> selector)

+ 4 - 4
Ix.NET/Source/System.Interactive.Async/MinMax.Generated.tt

@@ -46,13 +46,13 @@ foreach (var m in new[] { "Max", "Min" })
 if (n)
 {
 #>
-            return source.Aggregate(default(<#=t#>), Nullable<#=m#>, CancellationToken.None);
+            return source.Aggregate(default(<#=t#>), new Func<<#=t#>, <#=t#>, <#=t#>>(Nullable<#=m#>), CancellationToken.None);
 <#
 }
 else
 {
 #>
-            return source.Aggregate(Math.<#=m#>, CancellationToken.None);
+            return source.Aggregate(new Func<<#=t#>, <#=t#>, <#=t#>>(Math.<#=m#>), CancellationToken.None);
 <#
 }
 #>
@@ -67,13 +67,13 @@ else
 if (n)
 {
 #>
-            return source.Aggregate(default(<#=t#>), Nullable<#=m#>, cancellationToken);
+            return source.Aggregate(default(<#=t#>), new Func<<#=t#>, <#=t#>, <#=t#>>(Nullable<#=m#>), cancellationToken);
 <#
 }
 else
 {
 #>
-            return source.Aggregate(Math.<#=m#>, cancellationToken);
+            return source.Aggregate(new Func<<#=t#>, <#=t#>, <#=t#>>(Math.<#=m#>), cancellationToken);
 <#
 }
 #>

+ 10 - 10
Ix.NET/Source/Tests/AsyncTests.Aggregates.cs

@@ -23,24 +23,24 @@ namespace Tests
         public async Task Aggregate_Null()
         {
             await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int>(null, (x, y) => x + y));
-            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int>(AsyncEnumerable.Return(42), null));
+            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int>(AsyncEnumerable.Return(42), default(Func<int, int, int>)));
 
             await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int>(null, 0, (x, y) => x + y));
-            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int>(AsyncEnumerable.Return(42), 0, null));
+            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int>(AsyncEnumerable.Return(42), 0, default(Func<int, int, int>)));
 
             await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(null, 0, (x, y) => x + y, z => z));
-            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(AsyncEnumerable.Return(42), 0, null, z => z));
-            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(AsyncEnumerable.Return(42), 0, (x, y) => x + y, null));
+            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(AsyncEnumerable.Return(42), 0, default(Func<int, int, int>), z => z));
+            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(AsyncEnumerable.Return(42), 0, (x, y) => x + y, default(Func<int, int>)));
 
             await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int>(null, (x, y) => x + y, CancellationToken.None));
-            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int>(AsyncEnumerable.Return(42), null, CancellationToken.None));
+            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int>(AsyncEnumerable.Return(42), default(Func<int, int, int>), CancellationToken.None));
 
             await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int>(null, 0, (x, y) => x + y, CancellationToken.None));
-            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int>(AsyncEnumerable.Return(42), 0, null, CancellationToken.None));
+            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int>(AsyncEnumerable.Return(42), 0, default(Func<int, int, int>), CancellationToken.None));
 
             await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(null, 0, (x, y) => x + y, z => z, CancellationToken.None));
-            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(AsyncEnumerable.Return(42), 0, null, z => z, CancellationToken.None));
-            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(AsyncEnumerable.Return(42), 0, (x, y) => x + y, null, CancellationToken.None));
+            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(AsyncEnumerable.Return(42), 0, default(Func<int, int, int>), z => z, CancellationToken.None));
+            await Assert.ThrowsAsync<ArgumentNullException>(() => AsyncEnumerable.Aggregate<int, int, int>(AsyncEnumerable.Return(42), 0, (x, y) => x + y, default(Func<int, int>), CancellationToken.None));
         }
 
         [Fact]
@@ -73,7 +73,7 @@ namespace Tests
         {
             var ex = new Exception("Bang!");
             var xs = new[] { 1, 2, 3, 4 }.ToAsyncEnumerable();
-            var ys = xs.Aggregate((x, y) => { throw ex; });
+            var ys = xs.Aggregate(new Func<int, int, int>((x, y) => { throw ex; }));
             AssertThrows<Exception>(() => ys.Wait(WaitTimeoutMs), ex_ => ((AggregateException)ex_).Flatten().InnerExceptions.Single() == ex);
         }
 
@@ -107,7 +107,7 @@ namespace Tests
         {
             var ex = new Exception("Bang!");
             var xs = new[] { 1, 2, 3, 4 }.ToAsyncEnumerable();
-            var ys = xs.Aggregate(1, (x, y) => { throw ex; });
+            var ys = xs.Aggregate(1, new Func<int, int, int>((x, y) => { throw ex; }));
             AssertThrows<Exception>(() => ys.Wait(WaitTimeoutMs), ex_ => ((AggregateException)ex_).Flatten().InnerExceptions.Single() == ex);
         }