Pārlūkot izejas kodu

Move core logic of ToAsync to observers.

Bart De Smet 8 gadi atpakaļ
vecāks
revīzija
cd7580a8cb

+ 1279 - 594
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToAsync.Generated.cs

@@ -4,6 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Subjects;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
@@ -30,24 +31,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function();
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -72,24 +56,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -114,24 +81,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -156,24 +106,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -198,24 +131,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -240,24 +156,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -282,24 +181,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -324,24 +206,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -366,24 +231,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -408,24 +256,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -450,24 +281,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -492,24 +306,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -534,24 +331,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -576,24 +356,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -618,24 +381,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -660,24 +406,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -702,24 +431,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -744,23 +456,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action();
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -785,23 +481,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -826,23 +506,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -867,23 +531,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -908,23 +556,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -949,23 +581,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -990,23 +606,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1031,23 +631,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1072,23 +656,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1113,23 +681,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1154,23 +706,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1195,23 +731,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1236,23 +756,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1277,23 +781,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1318,23 +806,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1359,23 +831,7 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -1400,26 +856,1255 @@ namespace System.Reactive.Linq
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
+                AsyncObserver.ToAsync(subject, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
+
+                return subject.AsAsyncObservable();
+            };
+        }
+
+    }
+
+    partial class AsyncObserver
+    {
+        public static Task<IAsyncDisposable> ToAsync<TResult>(IAsyncObserver<TResult> observer, Func<TResult> function)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<TResult>(IAsyncObserver<TResult> observer, Func<TResult> function, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
 
-                scheduler.ScheduleAsync(async ct =>
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function();
+                }
+                catch (Exception ex)
                 {
-                    try
-                    {
-                        action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
 
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
 
-                return subject.AsAsyncObservable();
-            };
+        public static Task<IAsyncDisposable> ToAsync<T1, TResult>(IAsyncObserver<TResult> observer, Func<T1, TResult> function, T1 arg1)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, TResult>(IAsyncObserver<TResult> observer, Func<T1, TResult> function, T1 arg1, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, TResult> function, T1 arg1, T2 arg2)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, TResult> function, T1 arg1, T2 arg2, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, TResult> function, T1 arg1, T2 arg2, T3 arg3)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, TResult> function, T1 arg1, T2 arg2, T3 arg3, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, T15 arg15)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, T15 arg15, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, T15 arg15, T16 arg16)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(IAsyncObserver<TResult> observer, Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, T15 arg15, T16 arg16, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync(IAsyncObserver<Unit> observer, Action action)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync(IAsyncObserver<Unit> observer, Action action, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action();
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1>(IAsyncObserver<Unit> observer, Action<T1> action, T1 arg1)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1>(IAsyncObserver<Unit> observer, Action<T1> action, T1 arg1, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2>(IAsyncObserver<Unit> observer, Action<T1, T2> action, T1 arg1, T2 arg2)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2>(IAsyncObserver<Unit> observer, Action<T1, T2> action, T1 arg1, T2 arg2, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3>(IAsyncObserver<Unit> observer, Action<T1, T2, T3> action, T1 arg1, T2 arg2, T3 arg3)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3>(IAsyncObserver<Unit> observer, Action<T1, T2, T3> action, T1 arg1, T2 arg2, T3 arg3, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, T15 arg15)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, T15 arg15, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, T15 arg15, T16 arg16)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16, TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(IAsyncObserver<Unit> observer, Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7, T8 arg8, T9 arg9, T10 arg10, T11 arg11, T12 arg12, T13 arg13, T14 arg14, T15 arg15, T16 arg16, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
         }
 
     }

+ 144 - 35
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ToAsync.Generated.tt

@@ -10,6 +10,7 @@
 <#@ output extension=".cs" #>
 using System.Reactive.Concurrency;
 using System.Reactive.Subjects;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
@@ -27,6 +28,13 @@ for (var i = 0; i <= 16; i++)
     var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
 
     var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">";
+
+    var invokeArgs = pars;
+
+    if (i > 0)
+    {
+        invokeArgs = ", " + invokeArgs;
+    }
 #>
         public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function)
         {
@@ -47,24 +55,7 @@ for (var i = 0; i <= 16; i++)
             {
                 var subject = new SequentialAsyncAsyncSubject<TResult>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    TResult res;
-                    try
-                    {
-                        res = function(<#=pars#>);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(res).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, function<#=invokeArgs#>); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
@@ -89,6 +80,13 @@ for (var i = 0; i <= 16; i++)
         genArgs = "<" + genArgs + ">";
         type += genArgs;
     }
+
+    var invokeArgs = pars;
+
+    if (i > 0)
+    {
+        invokeArgs = ", " + invokeArgs;
+    }
 #>
         public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action)
         {
@@ -109,28 +107,139 @@ for (var i = 0; i <= 16; i++)
             {
                 var subject = new SequentialAsyncAsyncSubject<Unit>();
 
-                // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
-
-                scheduler.ScheduleAsync(async ct =>
-                {
-                    try
-                    {
-                        action(<#=pars#>);
-                    }
-                    catch (Exception ex)
-                    {
-                        await subject.OnErrorAsync(ex).RendezVous(scheduler);
-                        return;
-                    }
-
-                    await subject.OnNextAsync(Unit.Default).RendezVous(scheduler);
-                    await subject.OnCompletedAsync().RendezVous(scheduler);
-                });
+                AsyncObserver.ToAsync(subject, action<#=invokeArgs#>); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
 
                 return subject.AsAsyncObservable();
             };
         }
 
+<#
+}
+#>
+    }
+
+    partial class AsyncObserver
+    {
+<#
+for (var i = 0; i <= 16; i++)
+{
+    var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<TResult>" });
+    var ret = "Func<" + string.Join(", ", args) + ">";
+
+    var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
+    var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
+
+    var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">";
+
+    var invokePars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j + " arg" + j));
+    var invokeArgs = pars;
+    if (i > 0)
+    {
+        invokePars = ", " + invokePars;
+        invokeArgs += ", ";
+    }
+#>
+        public static Task<IAsyncDisposable> ToAsync<<#=genArgs#>>(IAsyncObserver<TResult> observer, <#=type#> function<#=invokePars#>)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+
+            return ToAsync(observer, function, <#=invokeArgs#>TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<<#=genArgs#>>(IAsyncObserver<TResult> observer, <#=type#> function<#=invokePars#>, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (function == null)
+                throw new ArgumentNullException(nameof(function));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                TResult res;
+                try
+                {
+                    res = function(<#=pars#>);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(res).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
+<#
+}
+#>
+<#
+for (var i = 0; i <= 16; i++)
+{
+    var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<Unit>" });
+    var ret = "Func<" + string.Join(", ", args) + ">";
+
+    var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
+    var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
+
+    var type = "Action";
+
+    if (genArgs != "")
+    {
+        genArgs = "<" + genArgs + ">";
+        type += genArgs;
+    }
+
+    var invokePars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j + " arg" + j));
+    var invokeArgs = pars;
+    if (i > 0)
+    {
+        invokePars = ", " + invokePars;
+        invokeArgs += ", ";
+    }
+#>
+        public static Task<IAsyncDisposable> ToAsync<#=genArgs#>(IAsyncObserver<Unit> observer, <#=type#> action<#=invokePars#>)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ToAsync(observer, action, <#=invokeArgs#>TaskPoolAsyncScheduler.Default);
+        }
+
+        public static Task<IAsyncDisposable> ToAsync<#=genArgs#>(IAsyncObserver<Unit> observer, <#=type#> action<#=invokePars#>, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    action(<#=pars#>);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).RendezVous(scheduler);
+                    return;
+                }
+
+                await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            });
+        }
+
 <#
 }
 #>