소스 검색

Adding Sum.

Bart De Smet 8 년 전
부모
커밋
c9e4fad755

+ 18 - 4
AsyncRx.NET/System.Reactive.Async/System.Reactive.Async.csproj

@@ -9,13 +9,17 @@
       <LastGenOutput>Average.Generated.cs</LastGenOutput>
       <Generator>TextTemplatingFileGenerator</Generator>
     </None>
+    <None Update="System\Reactive\Linq\Operators\Sum.Generated.tt">
+      <LastGenOutput>Sum.Generated.cs</LastGenOutput>
+      <Generator>TextTemplatingFileGenerator</Generator>
+    </None>
     <None Update="System\Reactive\Linq\Operators\Min.Generated.tt">
       <LastGenOutput>Min - Copy.Generated.cs</LastGenOutput>
       <Generator>TextTemplatingFileGenerator</Generator>
     </None>
     <None Update="System\Reactive\Linq\Operators\Max.Generated.tt">
       <Generator>TextTemplatingFileGenerator</Generator>
-      <LastGenOutput>Max - Copy.Generated.cs</LastGenOutput>
+      <LastGenOutput>Max.Generated.cs</LastGenOutput>
     </None>
   </ItemGroup>
 
@@ -29,16 +33,26 @@
       <AutoGen>True</AutoGen>
       <DependentUpon>Average.Generated.tt</DependentUpon>
     </Compile>
-    <Compile Update="System\Reactive\Linq\Operators\Max - Copy.Generated.cs">
-      <DependentUpon>Max - Copy.Generated.tt</DependentUpon>
+    <Compile Update="System\Reactive\Linq\Operators\Max - Copy - Copy.Generated.cs">
+      <DependentUpon>Sum.Generated.tt</DependentUpon>
       <DesignTime>True</DesignTime>
       <AutoGen>True</AutoGen>
     </Compile>
-    <Compile Update="System\Reactive\Linq\Operators\Max - Copy.Generated.cs">
+    <Compile Update="System\Reactive\Linq\Operators\Max.Generated.cs">
       <DesignTime>True</DesignTime>
       <AutoGen>True</AutoGen>
       <DependentUpon>Max.Generated.tt</DependentUpon>
     </Compile>
+    <Compile Update="System\Reactive\Linq\Operators\Sum.Generated.cs">
+      <DependentUpon>Max - Copy.Generated.tt</DependentUpon>
+      <DesignTime>True</DesignTime>
+      <AutoGen>True</AutoGen>
+    </Compile>
+    <Compile Update="System\Reactive\Linq\Operators\Sum.Generated.cs">
+      <DesignTime>True</DesignTime>
+      <AutoGen>True</AutoGen>
+      <DependentUpon>Sum.Generated.tt</DependentUpon>
+    </Compile>
     <Compile Update="System\Reactive\Linq\Operators\Min - Copy.Generated.cs">
       <DependentUpon>Min - Copy.Generated.tt</DependentUpon>
       <DesignTime>True</DesignTime>

+ 0 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Max - Copy.Generated.cs → AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Max.Generated.cs


+ 496 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Sum.Generated.cs

@@ -0,0 +1,496 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+        public static IAsyncObservable<Int32> Sum(this IAsyncObservable<Int32> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int32>(observer => source.SubscribeAsync(AsyncObserver.SumInt32(observer)));
+        }
+
+        public static IAsyncObservable<Int32> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int32> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32>(observer => source.SubscribeAsync(AsyncObserver.SumInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int32> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int32>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32>(observer => source.SubscribeAsync(AsyncObserver.SumInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int32?> Sum(this IAsyncObservable<Int32?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int32?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableInt32(observer)));
+        }
+
+        public static IAsyncObservable<Int32?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int32?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int32?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int32?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64> Sum(this IAsyncObservable<Int64> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int64>(observer => source.SubscribeAsync(AsyncObserver.SumInt64(observer)));
+        }
+
+        public static IAsyncObservable<Int64> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int64> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64>(observer => source.SubscribeAsync(AsyncObserver.SumInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int64>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64>(observer => source.SubscribeAsync(AsyncObserver.SumInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64?> Sum(this IAsyncObservable<Int64?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int64?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableInt64(observer)));
+        }
+
+        public static IAsyncObservable<Int64?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int64?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int64?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single> Sum(this IAsyncObservable<Single> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.SumSingle(observer)));
+        }
+
+        public static IAsyncObservable<Single> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Single> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.SumSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Single>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.SumSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single?> Sum(this IAsyncObservable<Single?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableSingle(observer)));
+        }
+
+        public static IAsyncObservable<Single?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Single?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Single?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Sum(this IAsyncObservable<Double> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.SumDouble(observer)));
+        }
+
+        public static IAsyncObservable<Double> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Double> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.SumDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Double>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.SumDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Sum(this IAsyncObservable<Double?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableDouble(observer)));
+        }
+
+        public static IAsyncObservable<Double?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Double?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Double?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal> Sum(this IAsyncObservable<Decimal> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.SumDecimal(observer)));
+        }
+
+        public static IAsyncObservable<Decimal> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Decimal> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.SumDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Decimal>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.SumDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal?> Sum(this IAsyncObservable<Decimal?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableDecimal(observer)));
+        }
+
+        public static IAsyncObservable<Decimal?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Decimal?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal?> Sum<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Decimal?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.SumNullableDecimal(observer, selector)));
+        }
+
+    }
+
+    partial class AsyncObserver
+    {
+        public static IAsyncObserver<TSource> SumInt32<TSource>(this IAsyncObserver<Int32> observer, Func<TSource, Int32> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumInt32<TSource>(this IAsyncObserver<Int32> observer, Func<TSource, Task<Int32>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableInt32<TSource>(this IAsyncObserver<Int32?> observer, Func<TSource, Int32?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableInt32<TSource>(this IAsyncObserver<Int32?> observer, Func<TSource, Task<Int32?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumInt64<TSource>(this IAsyncObserver<Int64> observer, Func<TSource, Int64> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumInt64<TSource>(this IAsyncObserver<Int64> observer, Func<TSource, Task<Int64>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableInt64<TSource>(this IAsyncObserver<Int64?> observer, Func<TSource, Int64?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableInt64<TSource>(this IAsyncObserver<Int64?> observer, Func<TSource, Task<Int64?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumSingle<TSource>(this IAsyncObserver<Single> observer, Func<TSource, Single> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumSingle<TSource>(this IAsyncObserver<Single> observer, Func<TSource, Task<Single>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableSingle<TSource>(this IAsyncObserver<Single?> observer, Func<TSource, Single?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableSingle<TSource>(this IAsyncObserver<Single?> observer, Func<TSource, Task<Single?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumDouble<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Double> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumDouble<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Task<Double>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableDouble<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Double?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableDouble<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Task<Double?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumDecimal<TSource>(this IAsyncObserver<Decimal> observer, Func<TSource, Decimal> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumDecimal<TSource>(this IAsyncObserver<Decimal> observer, Func<TSource, Task<Decimal>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableDecimal<TSource>(this IAsyncObserver<Decimal?> observer, Func<TSource, Decimal?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> SumNullableDecimal<TSource>(this IAsyncObserver<Decimal?> observer, Func<TSource, Task<Decimal?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(SumNullableDecimal(observer), selector);
+        }
+
+    }
+}

+ 100 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Sum.Generated.tt

@@ -0,0 +1,100 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+<#@ template debug="false" hostspecific="false" language="C#" #>
+<#@ assembly name="System.Core" #>
+<#@ import namespace="System.Linq" #>
+<#@ import namespace="System.Text" #>
+<#@ import namespace="System.Collections.Generic" #>
+<#@ output extension=".cs" #>
+<#
+var types = new[] { typeof(int), typeof(long), typeof(float), typeof(double), typeof(decimal) };
+var name = "Sum";
+#>
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+<#
+foreach (var t in types)
+{
+    foreach (var n in new[] { false, true })
+    {
+        var typeName = n ? t.Name + "?" : t.Name;
+        var methodName = n ? "Nullable" + t.Name : t.Name;
+
+#>
+        public static IAsyncObservable<<#=typeName#>> <#=name#>(this IAsyncObservable<<#=typeName#>> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<<#=typeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer)));
+        }
+
+        public static IAsyncObservable<<#=typeName#>> <#=name#><TSource>(this IAsyncObservable<TSource> source, Func<TSource, <#=typeName#>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<<#=typeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer, selector)));
+        }
+
+        public static IAsyncObservable<<#=typeName#>> <#=name#><TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<<#=typeName#>>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<<#=typeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer, selector)));
+        }
+
+<#
+    }
+}
+#>
+    }
+
+    partial class AsyncObserver
+    {
+<#
+foreach (var t in types)
+{
+    foreach (var n in new[] { false, true })
+    {
+        var typeName = n ? t.Name + "?" : t.Name;
+        var methodName = n ? "Nullable" + t.Name : t.Name;
+
+#>
+        public static IAsyncObserver<TSource> <#=name#><#=methodName#><TSource>(this IAsyncObserver<<#=typeName#>> observer, Func<TSource, <#=typeName#>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(<#=name#><#=methodName#>(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> <#=name#><#=methodName#><TSource>(this IAsyncObserver<<#=typeName#>> observer, Func<TSource, Task<<#=typeName#>>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(<#=name#><#=methodName#>(observer), selector);
+        }
+
+<#
+    }
+}
+#>
+    }
+}

+ 318 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Sum.cs

@@ -0,0 +1,318 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObserver
+    {
+        public static IAsyncObserver<int> SumInt32(IAsyncObserver<int> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0;
+
+            return Create<int>(
+                async x =>
+                {
+                    try
+                    {
+                        checked
+                        {
+                            sum += x;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(sum).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<long> SumInt64(IAsyncObserver<long> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0L;
+
+            return Create<long>(
+                async x =>
+                {
+                    try
+                    {
+                        checked
+                        {
+                            sum += x;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(sum).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<float> SumSingle(IAsyncObserver<float> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0.0;
+
+            return Create<float>(
+                x =>
+                {
+                    sum += x;
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    var res = default(float);
+
+                    try
+                    {
+                        checked
+                        {
+                            res = (float)sum;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                        return;
+                    }
+
+                    await observer.OnNextAsync(res).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<double> SumDouble(IAsyncObserver<double> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0.0;
+
+            return Create<double>(
+                x =>
+                {
+                    sum += x;
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(sum).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<decimal> SumDecimal(IAsyncObserver<decimal> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0m;
+
+            return Create<decimal>(
+                x =>
+                {
+                    sum += x;
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(sum).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<int?> SumNullableInt32(IAsyncObserver<int?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0;
+
+            return Create<int?>(
+                async x =>
+                {
+                    try
+                    {
+                        checked
+                        {
+                            if (x != null)
+                            {
+                                sum += x.GetValueOrDefault();
+                            }
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(sum).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<long?> SumNullableInt64(IAsyncObserver<long?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = (long?)0L;
+
+            return Create<long?>(
+                async x =>
+                {
+                    try
+                    {
+                        checked
+                        {
+                            if (x != null)
+                            {
+                                sum += x.GetValueOrDefault();
+                            }
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                    }
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(sum).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<float?> SumNullableSingle(IAsyncObserver<float?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0.0;
+
+            return Create<float?>(
+                x =>
+                {
+                    if (x != null)
+                    {
+                        sum += x.GetValueOrDefault();
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    var res = default(float);
+
+                    try
+                    {
+                        checked
+                        {
+                            res = (float)sum;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                        return;
+                    }
+
+                    await observer.OnNextAsync(res).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<double?> SumNullableDouble(IAsyncObserver<double?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0.0;
+
+            return Create<double?>(
+                x =>
+                {
+                    if (x != null)
+                    {
+                        sum += x.GetValueOrDefault();
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(sum).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<decimal?> SumNullableDecimal(IAsyncObserver<decimal?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var sum = 0m;
+
+            return Create<decimal?>(
+                x =>
+                {
+                    if (x != null)
+                    {
+                        sum += x.GetValueOrDefault();
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(sum).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+    }
+}