Jelajahi Sumber

Adding FromAsyncPattern for functions.

Bart De Smet 8 tahun lalu
induk
melakukan
7a2305a0ea

+ 17 - 0
AsyncRx.NET/System.Reactive.Async/System.Reactive.Async.csproj

@@ -4,6 +4,14 @@
     <TargetFramework>netstandard2.0</TargetFramework>
   </PropertyGroup>
 
+  <ItemGroup>
+    <None Include="System\Reactive\Linq\Operators\FromAsyncPattern.Generated.cs">
+      <DesignTime>True</DesignTime>
+      <AutoGen>True</AutoGen>
+      <DependentUpon>FromAsyncPattern.Generated.tt</DependentUpon>
+    </None>
+  </ItemGroup>
+
   <ItemGroup>
     <None Update="System\Reactive\Linq\Operators\Average.Generated.tt">
       <LastGenOutput>Average.Generated.cs</LastGenOutput>
@@ -13,6 +21,10 @@
       <LastGenOutput>CombineLatest.Generated.cs</LastGenOutput>
       <Generator>TextTemplatingFileGenerator</Generator>
     </None>
+    <None Update="System\Reactive\Linq\Operators\FromAsyncPattern.Generated.tt">
+      <Generator>TextTemplatingFileGenerator</Generator>
+      <LastGenOutput>FromAsyncPattern.Generated.cs</LastGenOutput>
+    </None>
     <None Update="System\Reactive\Linq\Operators\Min.Generated.tt">
       <LastGenOutput>Min.Generated.cs</LastGenOutput>
       <Generator>TextTemplatingFileGenerator</Generator>
@@ -46,6 +58,11 @@
       <AutoGen>True</AutoGen>
       <DependentUpon>CombineLatest.Generated.tt</DependentUpon>
     </Compile>
+    <Compile Update="System\Reactive\Linq\Operators\FromAsyncPattern.Generated.cs">
+      <DesignTime>True</DesignTime>
+      <AutoGen>True</AutoGen>
+      <DependentUpon>FromAsyncPattern.Generated.tt</DependentUpon>
+    </Compile>
     <Compile Update="System\Reactive\Linq\Operators\Max.Generated.cs">
       <DesignTime>True</DesignTime>
       <AutoGen>True</AutoGen>

+ 614 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/FromAsyncPattern.Generated.cs

@@ -0,0 +1,614 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Reactive.Subjects;
+
+namespace System.Reactive.Linq
+{
+    // REVIEW: Consider if these are worth retaining in the async space.
+
+    partial class AsyncObservable
+    {
+        public static Func<IAsyncObservable<TResult>> FromAsyncPattern<TResult>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return () =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, IAsyncObservable<TResult>> FromAsyncPattern<T1, TResult>(Func<T1, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, TResult>(Func<T1, T2, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, TResult>(Func<T1, T2, T3, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, T6, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5, arg6) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, arg6, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, T6, T7, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5, arg6, arg7) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, arg6, arg7, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, T6, T7, T8, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+        public static Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IAsyncObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+    }
+}

+ 75 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/FromAsyncPattern.Generated.tt

@@ -0,0 +1,75 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+<#@ template debug="false" hostspecific="false" language="C#" #>
+<#@ assembly name="System.Core" #>
+<#@ import namespace="System.Linq" #>
+<#@ import namespace="System.Text" #>
+<#@ import namespace="System.Collections.Generic" #>
+<#@ output extension=".cs" #>
+using System.Reactive.Subjects;
+
+namespace System.Reactive.Linq
+{
+    // REVIEW: Consider if these are worth retaining in the async space.
+
+    partial class AsyncObservable
+    {
+<#
+for (var i = 0; i <= 14; i++)
+{
+    var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<TResult>" });
+    var ret = "Func<" + string.Join(", ", args) + ">";
+
+    var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
+    var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
+
+    var begin = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "AsyncCallback, object, IAsyncResult" })) + ">";
+    var end = "Func<IAsyncResult, TResult>";
+#>
+        public static <#=ret#> FromAsyncPattern<<#=genArgs#>>(<#=begin#> begin, <#=end#> end)
+        {
+            if (begin == null)
+                throw new ArgumentNullException(nameof(begin));
+            if (end == null)
+                throw new ArgumentNullException(nameof(end));
+
+            return (<#=pars#>) =>
+            {
+                var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                try
+                {
+                    begin(<#=pars + (i > 0 ? ", " : "")#>async iar =>
+                    {
+                        TResult result;
+
+                        try
+                        {
+                            result = end(iar);
+                        }
+                        catch (Exception ex)
+                        {
+                            await subject.OnErrorAsync(ex).ConfigureAwait(false);
+                            return;
+                        }
+
+                        await subject.OnNextAsync(result).ConfigureAwait(false);
+                        await subject.OnCompletedAsync().ConfigureAwait(false);
+                    }, null);
+                }
+                catch (Exception ex)
+                {
+                    return Throw<TResult>(ex);
+                }
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+<#
+}
+#>
+    }
+}