123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- <#@ template debug="false" hostspecific="false" language="C#" #>
- <#@ assembly name="System.Core" #>
- <#@ import namespace="System.Linq" #>
- <#@ import namespace="System.Text" #>
- <#@ import namespace="System.Collections.Generic" #>
- <#@ output extension=".cs" #>
- using System.Reactive.Concurrency;
- using System.Reactive.Subjects;
- using System.Threading.Tasks;
- namespace System.Reactive.Linq
- {
- // REVIEW: Consider if these are worth retaining in the async space.
- partial class AsyncObservable
- {
- <#
- for (var i = 0; i <= 16; i++)
- {
- var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<TResult>" });
- var ret = "Func<" + string.Join(", ", args) + ">";
- var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
- var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
- var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">";
- var invokeArgs = pars;
- if (i > 0)
- {
- invokeArgs = ", " + invokeArgs;
- }
- #>
- public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function)
- {
- if (function == null)
- throw new ArgumentNullException(nameof(function));
- return ToAsync(function, TaskPoolAsyncScheduler.Default);
- }
- public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function, IAsyncScheduler scheduler)
- {
- if (function == null)
- throw new ArgumentNullException(nameof(function));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- return (<#=pars#>) =>
- {
- var subject = new SequentialAsyncAsyncSubject<TResult>();
- AsyncObserver.ToAsync(subject, function<#=invokeArgs#>); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
- return subject.AsAsyncObservable();
- };
- }
- <#
- }
- #>
- <#
- for (var i = 0; i <= 16; i++)
- {
- var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<Unit>" });
- var ret = "Func<" + string.Join(", ", args) + ">";
- var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
- var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
- var type = "Action";
- if (genArgs != "")
- {
- genArgs = "<" + genArgs + ">";
- type += genArgs;
- }
- var invokeArgs = pars;
- if (i > 0)
- {
- invokeArgs = ", " + invokeArgs;
- }
- #>
- public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action)
- {
- if (action == null)
- throw new ArgumentNullException(nameof(action));
- return ToAsync(action, TaskPoolAsyncScheduler.Default);
- }
- public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action, IAsyncScheduler scheduler)
- {
- if (action == null)
- throw new ArgumentNullException(nameof(action));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- return (<#=pars#>) =>
- {
- var subject = new SequentialAsyncAsyncSubject<Unit>();
- AsyncObserver.ToAsync(subject, action<#=invokeArgs#>); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
- return subject.AsAsyncObservable();
- };
- }
- <#
- }
- #>
- }
- partial class AsyncObserver
- {
- <#
- for (var i = 0; i <= 16; i++)
- {
- var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<TResult>" });
- var ret = "Func<" + string.Join(", ", args) + ">";
- var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
- var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
- var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">";
- var invokePars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j + " arg" + j));
- var invokeArgs = pars;
- if (i > 0)
- {
- invokePars = ", " + invokePars;
- invokeArgs += ", ";
- }
- #>
- public static Task<IAsyncDisposable> ToAsync<<#=genArgs#>>(IAsyncObserver<TResult> observer, <#=type#> function<#=invokePars#>)
- {
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- if (function == null)
- throw new ArgumentNullException(nameof(function));
- return ToAsync(observer, function, <#=invokeArgs#>TaskPoolAsyncScheduler.Default);
- }
- public static Task<IAsyncDisposable> ToAsync<<#=genArgs#>>(IAsyncObserver<TResult> observer, <#=type#> function<#=invokePars#>, IAsyncScheduler scheduler)
- {
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- if (function == null)
- throw new ArgumentNullException(nameof(function));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- return scheduler.ScheduleAsync(async ct =>
- {
- TResult res;
- try
- {
- res = function(<#=pars#>);
- }
- catch (Exception ex)
- {
- await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
- return;
- }
- await observer.OnNextAsync(res).RendezVous(scheduler, ct);
- await observer.OnCompletedAsync().RendezVous(scheduler, ct);
- });
- }
- <#
- }
- #>
- <#
- for (var i = 0; i <= 16; i++)
- {
- var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<Unit>" });
- var ret = "Func<" + string.Join(", ", args) + ">";
- var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
- var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
- var type = "Action";
- if (genArgs != "")
- {
- genArgs = "<" + genArgs + ">";
- type += genArgs;
- }
- var invokePars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j + " arg" + j));
- var invokeArgs = pars;
- if (i > 0)
- {
- invokePars = ", " + invokePars;
- invokeArgs += ", ";
- }
- #>
- public static Task<IAsyncDisposable> ToAsync<#=genArgs#>(IAsyncObserver<Unit> observer, <#=type#> action<#=invokePars#>)
- {
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- if (action == null)
- throw new ArgumentNullException(nameof(action));
- return ToAsync(observer, action, <#=invokeArgs#>TaskPoolAsyncScheduler.Default);
- }
- public static Task<IAsyncDisposable> ToAsync<#=genArgs#>(IAsyncObserver<Unit> observer, <#=type#> action<#=invokePars#>, IAsyncScheduler scheduler)
- {
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- if (action == null)
- throw new ArgumentNullException(nameof(action));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- return scheduler.ScheduleAsync(async ct =>
- {
- try
- {
- action(<#=pars#>);
- }
- catch (Exception ex)
- {
- await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
- return;
- }
- await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
- await observer.OnCompletedAsync().RendezVous(scheduler, ct);
- });
- }
- <#
- }
- #>
- }
- }
|