Browse Source

Adding Min and Max.

Bart De Smet 8 years ago
parent
commit
4882ab11fe

+ 33 - 0
AsyncRx.NET/System.Reactive.Async/System.Reactive.Async.csproj

@@ -4,4 +4,37 @@
     <TargetFramework>netstandard2.0</TargetFramework>
   </PropertyGroup>
 
+  <ItemGroup>
+    <None Update="System\Reactive\Linq\Operators\Min.Generated.tt">
+      <LastGenOutput>Min.Generated.cs</LastGenOutput>
+      <Generator>TextTemplatingFileGenerator</Generator>
+    </None>
+    <None Update="System\Reactive\Linq\Operators\Max.Generated.tt">
+      <Generator>TextTemplatingFileGenerator</Generator>
+      <LastGenOutput>Max - Copy.Generated.cs</LastGenOutput>
+    </None>
+  </ItemGroup>
+
+  <ItemGroup>
+    <Service Include="{508349b6-6b84-4df5-91f0-309beebad82d}" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <Compile Update="System\Reactive\Linq\Operators\Max - Copy.Generated.cs">
+      <DependentUpon>Max - Copy.Generated.tt</DependentUpon>
+      <DesignTime>True</DesignTime>
+      <AutoGen>True</AutoGen>
+    </Compile>
+    <Compile Update="System\Reactive\Linq\Operators\Max - Copy.Generated.cs">
+      <DesignTime>True</DesignTime>
+      <AutoGen>True</AutoGen>
+      <DependentUpon>Max.Generated.tt</DependentUpon>
+    </Compile>
+    <Compile Update="System\Reactive\Linq\Operators\Min.Generated.cs">
+      <DesignTime>True</DesignTime>
+      <AutoGen>True</AutoGen>
+      <DependentUpon>Min.Generated.tt</DependentUpon>
+    </Compile>
+  </ItemGroup>
+
 </Project>

+ 496 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Max - Copy.Generated.cs

@@ -0,0 +1,496 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+        public static IAsyncObservable<Int32> Max(this IAsyncObservable<Int32> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int32>(observer => source.SubscribeAsync(AsyncObserver.MaxInt32(observer)));
+        }
+
+        public static IAsyncObservable<Int32> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int32> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32>(observer => source.SubscribeAsync(AsyncObserver.MaxInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int32> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int32>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32>(observer => source.SubscribeAsync(AsyncObserver.MaxInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int32?> Max(this IAsyncObservable<Int32?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int32?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableInt32(observer)));
+        }
+
+        public static IAsyncObservable<Int32?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int32?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int32?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int32?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64> Max(this IAsyncObservable<Int64> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int64>(observer => source.SubscribeAsync(AsyncObserver.MaxInt64(observer)));
+        }
+
+        public static IAsyncObservable<Int64> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int64> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64>(observer => source.SubscribeAsync(AsyncObserver.MaxInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int64>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64>(observer => source.SubscribeAsync(AsyncObserver.MaxInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64?> Max(this IAsyncObservable<Int64?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int64?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableInt64(observer)));
+        }
+
+        public static IAsyncObservable<Int64?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int64?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int64?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single> Max(this IAsyncObservable<Single> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.MaxSingle(observer)));
+        }
+
+        public static IAsyncObservable<Single> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Single> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.MaxSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Single>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.MaxSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single?> Max(this IAsyncObservable<Single?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableSingle(observer)));
+        }
+
+        public static IAsyncObservable<Single?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Single?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Single?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Max(this IAsyncObservable<Double> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.MaxDouble(observer)));
+        }
+
+        public static IAsyncObservable<Double> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Double> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.MaxDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Double>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.MaxDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Max(this IAsyncObservable<Double?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableDouble(observer)));
+        }
+
+        public static IAsyncObservable<Double?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Double?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Double?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal> Max(this IAsyncObservable<Decimal> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.MaxDecimal(observer)));
+        }
+
+        public static IAsyncObservable<Decimal> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Decimal> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.MaxDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Decimal>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.MaxDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal?> Max(this IAsyncObservable<Decimal?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableDecimal(observer)));
+        }
+
+        public static IAsyncObservable<Decimal?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Decimal?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal?> Max<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Decimal?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.MaxNullableDecimal(observer, selector)));
+        }
+
+    }
+
+    partial class AsyncObserver
+    {
+        public static IAsyncObserver<TSource> MaxInt32<TSource>(this IAsyncObserver<Int32> observer, Func<TSource, Int32> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxInt32<TSource>(this IAsyncObserver<Int32> observer, Func<TSource, Task<Int32>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableInt32<TSource>(this IAsyncObserver<Int32?> observer, Func<TSource, Int32?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableInt32<TSource>(this IAsyncObserver<Int32?> observer, Func<TSource, Task<Int32?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxInt64<TSource>(this IAsyncObserver<Int64> observer, Func<TSource, Int64> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxInt64<TSource>(this IAsyncObserver<Int64> observer, Func<TSource, Task<Int64>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableInt64<TSource>(this IAsyncObserver<Int64?> observer, Func<TSource, Int64?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableInt64<TSource>(this IAsyncObserver<Int64?> observer, Func<TSource, Task<Int64?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxSingle<TSource>(this IAsyncObserver<Single> observer, Func<TSource, Single> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxSingle<TSource>(this IAsyncObserver<Single> observer, Func<TSource, Task<Single>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableSingle<TSource>(this IAsyncObserver<Single?> observer, Func<TSource, Single?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableSingle<TSource>(this IAsyncObserver<Single?> observer, Func<TSource, Task<Single?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxDouble<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Double> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxDouble<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Task<Double>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableDouble<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Double?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableDouble<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Task<Double?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxDecimal<TSource>(this IAsyncObserver<Decimal> observer, Func<TSource, Decimal> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxDecimal<TSource>(this IAsyncObserver<Decimal> observer, Func<TSource, Task<Decimal>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableDecimal<TSource>(this IAsyncObserver<Decimal?> observer, Func<TSource, Decimal?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MaxNullableDecimal<TSource>(this IAsyncObserver<Decimal?> observer, Func<TSource, Task<Decimal?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MaxNullableDecimal(observer), selector);
+        }
+
+    }
+}

+ 100 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Max.Generated.tt

@@ -0,0 +1,100 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+<#@ template debug="false" hostspecific="false" language="C#" #>
+<#@ assembly name="System.Core" #>
+<#@ import namespace="System.Linq" #>
+<#@ import namespace="System.Text" #>
+<#@ import namespace="System.Collections.Generic" #>
+<#@ output extension=".cs" #>
+<#
+var types = new[] { typeof(int), typeof(long), typeof(float), typeof(double), typeof(decimal) };
+var name = "Max";
+#>
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+<#
+foreach (var t in types)
+{
+    foreach (var n in new[] { false, true })
+    {
+        var typeName = n ? t.Name + "?" : t.Name;
+        var methodName = n ? "Nullable" + t.Name : t.Name;
+
+#>
+        public static IAsyncObservable<<#=typeName#>> <#=name#>(this IAsyncObservable<<#=typeName#>> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<<#=typeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer)));
+        }
+
+        public static IAsyncObservable<<#=typeName#>> <#=name#><TSource>(this IAsyncObservable<TSource> source, Func<TSource, <#=typeName#>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<<#=typeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer, selector)));
+        }
+
+        public static IAsyncObservable<<#=typeName#>> <#=name#><TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<<#=typeName#>>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<<#=typeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer, selector)));
+        }
+
+<#
+    }
+}
+#>
+    }
+
+    partial class AsyncObserver
+    {
+<#
+foreach (var t in types)
+{
+    foreach (var n in new[] { false, true })
+    {
+        var typeName = n ? t.Name + "?" : t.Name;
+        var methodName = n ? "Nullable" + t.Name : t.Name;
+
+#>
+        public static IAsyncObserver<TSource> <#=name#><#=methodName#><TSource>(this IAsyncObserver<<#=typeName#>> observer, Func<TSource, <#=typeName#>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(<#=name#><#=methodName#>(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> <#=name#><#=methodName#><TSource>(this IAsyncObserver<<#=typeName#>> observer, Func<TSource, Task<<#=typeName#>>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(<#=name#><#=methodName#>(observer), selector);
+        }
+
+<#
+    }
+}
+#>
+    }
+}

+ 351 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Max.cs

@@ -0,0 +1,351 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObserver
+    {
+        public static IAsyncObserver<int> MaxInt32(IAsyncObserver<int> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = 0;
+            var found = false;
+
+            return Create<int>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x > max)
+                        {
+                            max = x;
+                        }
+                    }
+                    else
+                    {
+                        max = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(max).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<long> MaxInt64(IAsyncObserver<long> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = 0L;
+            var found = false;
+
+            return Create<long>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x > max)
+                        {
+                            max = x;
+                        }
+                    }
+                    else
+                    {
+                        max = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(max).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<float> MaxSingle(IAsyncObserver<float> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = 0.0f;
+            var found = false;
+
+            return Create<float>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x > max || double.IsNaN(x))
+                        {
+                            max = x;
+                        }
+                    }
+                    else
+                    {
+                        max = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(max).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<double> MaxDouble(IAsyncObserver<double> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = 0.0;
+            var found = false;
+
+            return Create<double>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x > max || double.IsNaN(x))
+                        {
+                            max = x;
+                        }
+                    }
+                    else
+                    {
+                        max = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(max).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<decimal> MaxDecimal(IAsyncObserver<decimal> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = 0m;
+            var found = false;
+
+            return Create<decimal>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x > max)
+                        {
+                            max = x;
+                        }
+                    }
+                    else
+                    {
+                        max = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(max).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<int?> MaxNullableInt32(IAsyncObserver<int?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = default(int?);
+
+            return Create<int?>(
+                x =>
+                {
+                    if (max == null || x > max)
+                    {
+                        max = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(max).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<long?> MaxNullableInt64(IAsyncObserver<long?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = default(long?);
+
+            return Create<long?>(
+                x =>
+                {
+                    if (max == null || x > max)
+                    {
+                        max = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(max).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<float?> MaxNullableSingle(IAsyncObserver<float?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = default(float?);
+
+            return Create<float?>(
+                x =>
+                {
+                    if (x != null && (max == null || x > max || double.IsNaN(x.Value)))
+                    {
+                        max = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(max).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<double?> MaxNullableDouble(IAsyncObserver<double?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = default(double?);
+
+            return Create<double?>(
+                x =>
+                {
+                    if (x != null && (max == null || x > max || double.IsNaN(x.Value)))
+                    {
+                        max = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(max).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<decimal?> MaxNullableDecimal(IAsyncObserver<decimal?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var max = default(decimal?);
+
+            return Create<decimal?>(
+                x =>
+                {
+                    if (max == null || x > max)
+                    {
+                        max = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(max).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+    }
+}

+ 496 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Min.Generated.cs

@@ -0,0 +1,496 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+        public static IAsyncObservable<Int32> Min(this IAsyncObservable<Int32> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int32>(observer => source.SubscribeAsync(AsyncObserver.MinInt32(observer)));
+        }
+
+        public static IAsyncObservable<Int32> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int32> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32>(observer => source.SubscribeAsync(AsyncObserver.MinInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int32> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int32>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32>(observer => source.SubscribeAsync(AsyncObserver.MinInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int32?> Min(this IAsyncObservable<Int32?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int32?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableInt32(observer)));
+        }
+
+        public static IAsyncObservable<Int32?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int32?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int32?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int32?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int32?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableInt32(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64> Min(this IAsyncObservable<Int64> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int64>(observer => source.SubscribeAsync(AsyncObserver.MinInt64(observer)));
+        }
+
+        public static IAsyncObservable<Int64> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int64> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64>(observer => source.SubscribeAsync(AsyncObserver.MinInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int64>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64>(observer => source.SubscribeAsync(AsyncObserver.MinInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64?> Min(this IAsyncObservable<Int64?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Int64?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableInt64(observer)));
+        }
+
+        public static IAsyncObservable<Int64?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Int64?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Int64?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Int64?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Int64?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableInt64(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single> Min(this IAsyncObservable<Single> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.MinSingle(observer)));
+        }
+
+        public static IAsyncObservable<Single> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Single> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.MinSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Single>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single>(observer => source.SubscribeAsync(AsyncObserver.MinSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single?> Min(this IAsyncObservable<Single?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableSingle(observer)));
+        }
+
+        public static IAsyncObservable<Single?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Single?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Single?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Single?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Single?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableSingle(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Min(this IAsyncObservable<Double> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.MinDouble(observer)));
+        }
+
+        public static IAsyncObservable<Double> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Double> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.MinDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Double>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double>(observer => source.SubscribeAsync(AsyncObserver.MinDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Min(this IAsyncObservable<Double?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableDouble(observer)));
+        }
+
+        public static IAsyncObservable<Double?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Double?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Double?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Double?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Double?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableDouble(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal> Min(this IAsyncObservable<Decimal> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.MinDecimal(observer)));
+        }
+
+        public static IAsyncObservable<Decimal> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Decimal> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.MinDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Decimal>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal>(observer => source.SubscribeAsync(AsyncObserver.MinDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal?> Min(this IAsyncObservable<Decimal?> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableDecimal(observer)));
+        }
+
+        public static IAsyncObservable<Decimal?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Decimal?> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableDecimal(observer, selector)));
+        }
+
+        public static IAsyncObservable<Decimal?> Min<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<Decimal?>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<Decimal?>(observer => source.SubscribeAsync(AsyncObserver.MinNullableDecimal(observer, selector)));
+        }
+
+    }
+
+    partial class AsyncObserver
+    {
+        public static IAsyncObserver<TSource> MinInt32<TSource>(this IAsyncObserver<Int32> observer, Func<TSource, Int32> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinInt32<TSource>(this IAsyncObserver<Int32> observer, Func<TSource, Task<Int32>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableInt32<TSource>(this IAsyncObserver<Int32?> observer, Func<TSource, Int32?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableInt32<TSource>(this IAsyncObserver<Int32?> observer, Func<TSource, Task<Int32?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableInt32(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinInt64<TSource>(this IAsyncObserver<Int64> observer, Func<TSource, Int64> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinInt64<TSource>(this IAsyncObserver<Int64> observer, Func<TSource, Task<Int64>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableInt64<TSource>(this IAsyncObserver<Int64?> observer, Func<TSource, Int64?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableInt64<TSource>(this IAsyncObserver<Int64?> observer, Func<TSource, Task<Int64?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableInt64(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinSingle<TSource>(this IAsyncObserver<Single> observer, Func<TSource, Single> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinSingle<TSource>(this IAsyncObserver<Single> observer, Func<TSource, Task<Single>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableSingle<TSource>(this IAsyncObserver<Single?> observer, Func<TSource, Single?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableSingle<TSource>(this IAsyncObserver<Single?> observer, Func<TSource, Task<Single?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableSingle(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinDouble<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Double> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinDouble<TSource>(this IAsyncObserver<Double> observer, Func<TSource, Task<Double>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableDouble<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Double?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableDouble<TSource>(this IAsyncObserver<Double?> observer, Func<TSource, Task<Double?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableDouble(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinDecimal<TSource>(this IAsyncObserver<Decimal> observer, Func<TSource, Decimal> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinDecimal<TSource>(this IAsyncObserver<Decimal> observer, Func<TSource, Task<Decimal>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableDecimal<TSource>(this IAsyncObserver<Decimal?> observer, Func<TSource, Decimal?> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableDecimal(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> MinNullableDecimal<TSource>(this IAsyncObserver<Decimal?> observer, Func<TSource, Task<Decimal?>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(MinNullableDecimal(observer), selector);
+        }
+
+    }
+}

+ 100 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Min.Generated.tt

@@ -0,0 +1,100 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+<#@ template debug="false" hostspecific="false" language="C#" #>
+<#@ assembly name="System.Core" #>
+<#@ import namespace="System.Linq" #>
+<#@ import namespace="System.Text" #>
+<#@ import namespace="System.Collections.Generic" #>
+<#@ output extension=".cs" #>
+<#
+var types = new[] { typeof(int), typeof(long), typeof(float), typeof(double), typeof(decimal) };
+var name = "Min";
+#>
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+<#
+foreach (var t in types)
+{
+    foreach (var n in new[] { false, true })
+    {
+        var typeName = n ? t.Name + "?" : t.Name;
+        var methodName = n ? "Nullable" + t.Name : t.Name;
+
+#>
+        public static IAsyncObservable<<#=typeName#>> <#=name#>(this IAsyncObservable<<#=typeName#>> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Create<<#=typeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer)));
+        }
+
+        public static IAsyncObservable<<#=typeName#>> <#=name#><TSource>(this IAsyncObservable<TSource> source, Func<TSource, <#=typeName#>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<<#=typeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer, selector)));
+        }
+
+        public static IAsyncObservable<<#=typeName#>> <#=name#><TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task<<#=typeName#>>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Create<<#=typeName#>>(observer => source.SubscribeAsync(AsyncObserver.<#=name#><#=methodName#>(observer, selector)));
+        }
+
+<#
+    }
+}
+#>
+    }
+
+    partial class AsyncObserver
+    {
+<#
+foreach (var t in types)
+{
+    foreach (var n in new[] { false, true })
+    {
+        var typeName = n ? t.Name + "?" : t.Name;
+        var methodName = n ? "Nullable" + t.Name : t.Name;
+
+#>
+        public static IAsyncObserver<TSource> <#=name#><#=methodName#><TSource>(this IAsyncObserver<<#=typeName#>> observer, Func<TSource, <#=typeName#>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(<#=name#><#=methodName#>(observer), selector);
+        }
+
+        public static IAsyncObserver<TSource> <#=name#><#=methodName#><TSource>(this IAsyncObserver<<#=typeName#>> observer, Func<TSource, Task<<#=typeName#>>> selector)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Select(<#=name#><#=methodName#>(observer), selector);
+        }
+
+<#
+    }
+}
+#>
+    }
+}

+ 351 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Min.cs

@@ -0,0 +1,351 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObserver
+    {
+        public static IAsyncObserver<int> MinInt32(this IAsyncObserver<int> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = 0;
+            var found = false;
+
+            return Create<int>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x < min)
+                        {
+                            min = x;
+                        }
+                    }
+                    else
+                    {
+                        min = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(min).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<long> MinInt64(this IAsyncObserver<long> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = 0L;
+            var found = false;
+
+            return Create<long>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x < min)
+                        {
+                            min = x;
+                        }
+                    }
+                    else
+                    {
+                        min = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(min).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<float> MinSingle(this IAsyncObserver<float> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = 0.0f;
+            var found = false;
+
+            return Create<float>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x < min || double.IsNaN(x))
+                        {
+                            min = x;
+                        }
+                    }
+                    else
+                    {
+                        min = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(min).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<double> MinDouble(this IAsyncObserver<double> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = 0.0;
+            var found = false;
+
+            return Create<double>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x < min || double.IsNaN(x))
+                        {
+                            min = x;
+                        }
+                    }
+                    else
+                    {
+                        min = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(min).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<decimal> MinDecimal(this IAsyncObserver<decimal> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = 0m;
+            var found = false;
+
+            return Create<decimal>(
+                x =>
+                {
+                    if (found)
+                    {
+                        if (x < min)
+                        {
+                            min = x;
+                        }
+                    }
+                    else
+                    {
+                        min = x;
+                        found = true;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    if (!found)
+                    {
+                        await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await observer.OnNextAsync(min).ConfigureAwait(false);
+                        await observer.OnCompletedAsync().ConfigureAwait(false);
+                    }
+                }
+            );
+        }
+
+        public static IAsyncObserver<int?> MinNullableInt32(this IAsyncObserver<int?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = default(int?);
+
+            return Create<int?>(
+                x =>
+                {
+                    if (min == null || x < min)
+                    {
+                        min = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(min).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<long?> MinNullableInt64(this IAsyncObserver<long?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = default(long?);
+
+            return Create<long?>(
+                x =>
+                {
+                    if (min == null || x < min)
+                    {
+                        min = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(min).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<float?> MinNullableSingle(this IAsyncObserver<float?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = default(float?);
+
+            return Create<float?>(
+                x =>
+                {
+                    if (x != null && (min == null || x < min || double.IsNaN(x.Value)))
+                    {
+                        min = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(min).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<double?> MinNullableDouble(this IAsyncObserver<double?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = default(double?);
+
+            return Create<double?>(
+                x =>
+                {
+                    if (x != null && (min == null || x < min || double.IsNaN(x.Value)))
+                    {
+                        min = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(min).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+
+        public static IAsyncObserver<decimal?> MinNullableDecimal(this IAsyncObserver<decimal?> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var min = default(decimal?);
+
+            return Create<decimal?>(
+                x =>
+                {
+                    if (min == null || x < min)
+                    {
+                        min = x;
+                    }
+
+                    return Task.CompletedTask;
+                },
+                observer.OnErrorAsync,
+                async () =>
+                {
+                    await observer.OnNextAsync(min).ConfigureAwait(false);
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
+        }
+    }
+}