Browse Source

Adding Average.

Bart De Smet 8 năm trước cách đây
mục cha
commit
1df869463d

+ 16 - 2
AsyncRx.NET/System.Reactive.Async/System.Reactive.Async.csproj

@@ -5,8 +5,12 @@
   </PropertyGroup>
 
   <ItemGroup>
+    <None Update="System\Reactive\Linq\Operators\Average.Generated.tt">
+      <LastGenOutput>Average.Generated.cs</LastGenOutput>
+      <Generator>TextTemplatingFileGenerator</Generator>
+    </None>
     <None Update="System\Reactive\Linq\Operators\Min.Generated.tt">
-      <LastGenOutput>Min.Generated.cs</LastGenOutput>
+      <LastGenOutput>Min - Copy.Generated.cs</LastGenOutput>
       <Generator>TextTemplatingFileGenerator</Generator>
     </None>
     <None Update="System\Reactive\Linq\Operators\Max.Generated.tt">
@@ -20,6 +24,11 @@
   </ItemGroup>
 
   <ItemGroup>
+    <Compile Update="System\Reactive\Linq\Operators\Average.Generated.cs">
+      <DesignTime>True</DesignTime>
+      <AutoGen>True</AutoGen>
+      <DependentUpon>Average.Generated.tt</DependentUpon>
+    </Compile>
     <Compile Update="System\Reactive\Linq\Operators\Max - Copy.Generated.cs">
       <DependentUpon>Max - Copy.Generated.tt</DependentUpon>
       <DesignTime>True</DesignTime>
@@ -30,7 +39,12 @@
       <AutoGen>True</AutoGen>
       <DependentUpon>Max.Generated.tt</DependentUpon>
     </Compile>
-    <Compile Update="System\Reactive\Linq\Operators\Min.Generated.cs">
+    <Compile Update="System\Reactive\Linq\Operators\Min - Copy.Generated.cs">
+      <DependentUpon>Min - Copy.Generated.tt</DependentUpon>
+      <DesignTime>True</DesignTime>
+      <AutoGen>True</AutoGen>
+    </Compile>
+    <Compile Update="System\Reactive\Linq\Operators\Min - Copy.Generated.cs">
       <DesignTime>True</DesignTime>
       <AutoGen>True</AutoGen>
       <DependentUpon>Min.Generated.tt</DependentUpon>

+ 496 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Average.Generated.cs

@@ -0,0 +1,496 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+        public static IAsyncObservable<Double> Average(this IAsyncObservable<Int32> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.AverageInt32(observer)));
+        }
+
+        public static IAsyncObservable<Double> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int32> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.AverageInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int32>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.AverageInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Average(this IAsyncObservable<Int32?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableInt32(observer)));
+        }
+
+        public static IAsyncObservable<Double?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int32?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int32?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Average(this IAsyncObservable<Int64> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.AverageInt64(observer)));
+        }
+
+        public static IAsyncObservable<Double> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int64> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.AverageInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int64>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.AverageInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Average(this IAsyncObservable<Int64?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableInt64(observer)));
+        }
+
+        public static IAsyncObservable<Double?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int64?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int64?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single> Average(this IAsyncObservable<Single> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.AverageSingle(observer)));
+        }
+
+        public static IAsyncObservable<Single> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Single> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.AverageSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Single>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.AverageSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single?> Average(this IAsyncObservable<Single?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableSingle(observer)));
+        }
+
+        public static IAsyncObservable<Single?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Single?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Single?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Average(this IAsyncObservable<Double> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.AverageDouble(observer)));
+        }
+
+        public static IAsyncObservable<Double> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Double> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.AverageDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Double>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.AverageDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Average(this IAsyncObservable<Double?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableDouble(observer)));
+        }
+
+        public static IAsyncObservable<Double?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Double?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Double?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal> Average(this IAsyncObservable<Decimal> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.AverageDecimal(observer)));
+        }
+
+        public static IAsyncObservable<Decimal> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Decimal> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.AverageDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Decimal>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.AverageDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal?> Average(this IAsyncObservable<Decimal?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableDecimal(observer)));
+        }
+
+        public static IAsyncObservable<Decimal?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Decimal?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal?> Average<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Decimal?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.AverageNullableDecimal(observer, selector)));
+        }
+
+    }
+
+    partial class AsyncObserver
+    {
+        public static IAsyncObserver<TSource> AverageInt32<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Int32> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageInt32<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Task<Int32>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableInt32<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Int32?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableInt32<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Task<Int32?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageInt64<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Int64> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageInt64<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Task<Int64>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableInt64<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Int64?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableInt64<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Task<Int64?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageSingle<TSource>(this IAsyncObserver<Single> observer, Func<TSource, Single> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageSingle<TSource>(this IAsyncObserver<Single> observer, Func<TSource, Task<Single>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableSingle<TSource>(this IAsyncObserver<Single?> observer, Func<TSource, Single?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableSingle<TSource>(this IAsyncObserver<Single?> observer, Func<TSource, Task<Single?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageDouble<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Double> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageDouble<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Task<Double>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableDouble<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Double?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableDouble<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Task<Double?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageDecimal<TSource>(this IAsyncObserver<Decimal> observer, Func<TSource, Decimal> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageDecimal<TSource>(this IAsyncObserver<Decimal> observer, Func<TSource, Task<Decimal>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableDecimal<TSource>(this IAsyncObserver<Decimal?> observer, Func<TSource, Decimal?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> AverageNullableDecimal<TSource>(this IAsyncObserver<Decimal?> observer, Func<TSource, Task<Decimal?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(AverageNullableDecimal(observer), selector);
+        }
+
+    }
+}

+ 115 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Average.Generated.tt

@@ -0,0 +1,115 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+<#@ template debug="false" hostspecific="false" language="C#" #>
+<#@ assembly name="System.Core" #>
+<#@ import namespace="System.Linq" #>
+<#@ import namespace="System.Text" #>
+<#@ import namespace="System.Collections.Generic" #>
+<#@ output extension=".cs" #>
+<#
+var types = new[]
+{
+    new { source = typeof(int),     target = typeof(double)  },
+    new { source = typeof(long),    target = typeof(double)  },
+    new { source = typeof(float),   target = typeof(float)   },
+    new { source = typeof(double),  target = typeof(double)  },
+    new { source = typeof(decimal), target = typeof(decimal) },
+};
+var name = "Average";
+#>
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+<#
+foreach (var t in types)
+{
+    var sourceType = t.source;
+    var targetType = t.target;
+
+    foreach (var n in new[] { false, true })
+    {
+        var sourceTypeName = n ? sourceType.Name + "?" : sourceType.Name;
+        var targetTypeName = n ? targetType.Name + "?" : targetType.Name;
+        var methodName = n ? "Nullable" + sourceType.Name : sourceType.Name;
+
+#>
+        public static IAsyncObservable<<#=targetTypeName#>> <#=name#>(this IAsyncObservable<<#=sourceTypeName#>> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<<#=targetTypeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer)));
+        }
+
+        public static IAsyncObservable<<#=targetTypeName#>> <#=name#><TSource>(this IAsyncObservable<TSource> source, Func<TSource, <#=sourceTypeName#>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<<#=targetTypeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer, selector)));
+        }
+
+        public static IAsyncObservable<<#=targetTypeName#>> <#=name#><TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<<#=sourceTypeName#>>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<<#=targetTypeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer, selector)));
+        }
+
+<#
+    }
+}
+#>
+    }
+
+    partial class AsyncObserver
+    {
+<#
+foreach (var t in types)
+{
+    var sourceType = t.source;
+    var targetType = t.target;
+
+    foreach (var n in new[] { false, true })
+    {
+        var sourceTypeName = n ? sourceType.Name + "?" : sourceType.Name;
+        var targetTypeName = n ? targetType.Name + "?" : targetType.Name;
+        var methodName = n ? "Nullable" + sourceType.Name : sourceType.Name;
+
+#>
+        public static IAsyncObserver<TSource> <#=name#><#=methodName#><TSource>(this IAsyncObserver<<#=targetTypeName#>> observer, Func<TSource, <#=sourceTypeName#>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(<#=name#><#=methodName#>(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> <#=name#><#=methodName#><TSource>(this IAsyncObserver<<#=targetTypeName#>> observer, Func<TSource, Task<<#=sourceTypeName#>>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(<#=name#><#=methodName#>(observer), selector);
+        }
+
+<#
+    }
+}
+#>
+    }
+}

+ 579 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Average.cs

@@ -0,0 +1,579 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObserver
+    {
+        public static IAsyncObserver<int> AverageInt32(IAsyncObserver<double> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0L;
+            var count = 0L;
+
+            return Create<int>(
+                async x =>
+                {
+                    try
+                    {
+                        checked
+                        {
+                            sum += x;
+                            count++;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(double);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = (double)sum / count;
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<long> AverageInt64(IAsyncObserver<double> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0L;
+            var count = 0L;
+
+            return Create<long>(
+                async x =>
+                {
+                    try
+                    {
+                        checked
+                        {
+                            sum += x;
+                            count++;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(double);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = (double)sum / count;
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<float> AverageSingle(IAsyncObserver<float> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0.0;
+            var count = 0L;
+
+            return Create<float>(
+                async x =>
+                {
+                    try
+                    {
+                        checked
+                        {
+                            sum += x;
+                            count++;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(float);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = (float)(sum / count);
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<double> AverageDouble(IAsyncObserver<double> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0.0;
+            var count = 0L;
+
+            return Create<double>(
+                async x =>
+                {
+                    try
+                    {
+                        checked
+                        {
+                            sum += x;
+                            count++;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(double);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = sum / count;
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<decimal> AverageDecimal(IAsyncObserver<decimal> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0m;
+            var count = 0L;
+
+            return Create<decimal>(
+                async x =>
+                {
+                    try
+                    {
+                        checked
+                        {
+                            sum += x;
+                            count++;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(decimal);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = sum / count;
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<int?> AverageNullableInt32(IAsyncObserver<double?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0L;
+            var count = 0L;
+
+            return Create<int?>(
+                async x =>
+                {
+                    try
+                    {
+                        if (x.HasValue)
+                        {
+                            checked
+                            {
+                                sum += x.GetValueOrDefault();
+                                count++;
+                            }
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(double);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = (double)sum / count;
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(null).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<long?> AverageNullableInt64(IAsyncObserver<double?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0L;
+            var count = 0L;
+
+            return Create<long?>(
+                async x =>
+                {
+                    try
+                    {
+                        if (x.HasValue)
+                        {
+                            checked
+                            {
+                                sum += x.GetValueOrDefault();
+                                count++;
+                            }
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(double);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = (double)sum / count;
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(null).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<float?> AverageNullableSingle(IAsyncObserver<float?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0.0;
+            var count = 0L;
+
+            return Create<float?>(
+                async x =>
+                {
+                    try
+                    {
+                        if (x.HasValue)
+                        {
+                            checked
+                            {
+                                sum += x.GetValueOrDefault();
+                                count++;
+                            }
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(float);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = (float)(sum / count);
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(null).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<double?> AverageNullableDouble(IAsyncObserver<double?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0.0;
+            var count = 0L;
+
+            return Create<double?>(
+                async x =>
+                {
+                    try
+                    {
+                        if (x.HasValue)
+                        {
+                            checked
+                            {
+                                sum += x.GetValueOrDefault();
+                                count++;
+                            }
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(double);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = sum / count;
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(null).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<decimal?> AverageNullableDecimal(IAsyncObserver<decimal?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0m;
+            var count = 0L;
+
+            return Create<decimal?>(
+                async x =>
+                {
+                    try
+                    {
+                        if (x.HasValue)
+                        {
+                            checked
+                            {
+                                sum += x.GetValueOrDefault();
+                                count++;
+                            }
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (count > 0)
+                    {
+                        var res = default(decimal);
+
+                        try
+                        {
+                            checked
+                            {
+                                res = sum / count;
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await observer.OnNextAsync(res).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(null).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+    }
+}

+ 0 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Min.Generated.cs → AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Min - Copy.Generated.cs