// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. <#@ template debug="false" hostspecific="false" language="C#" #> <#@ assembly name="System.Core" #> <#@ import namespace="System.Linq" #> <#@ import namespace="System.Text" #> <#@ import namespace="System.Collections.Generic" #> <#@ output extension=".cs" #> using System.Reactive.Concurrency; using System.Reactive.Subjects; using System.Threading.Tasks; namespace System.Reactive.Linq { // REVIEW: Consider if these are worth retaining in the async space. partial class AsyncObservable { <# for (var i = 0; i <= 16; i++) { var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable" }); var ret = "Func<" + string.Join(", ", args) + ">"; var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })); var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j)); var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">"; var invokeArgs = pars; if (i > 0) { invokeArgs = ", " + invokeArgs; } #> public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function) { if (function == null) throw new ArgumentNullException(nameof(function)); return ToAsync(function, TaskPoolAsyncScheduler.Default); } public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function, IAsyncScheduler scheduler) { if (function == null) throw new ArgumentNullException(nameof(function)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return (<#=pars#>) => { var subject = new SequentialAsyncAsyncSubject(); AsyncObserver.ToAsync(subject, function<#=invokeArgs#>); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled. return subject.AsAsyncObservable(); }; } <# } #> <# for (var i = 0; i <= 16; i++) { var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable" }); var ret = "Func<" + string.Join(", ", args) + ">"; var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j)); var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j)); var type = "Action"; if (genArgs != "") { genArgs = "<" + genArgs + ">"; type += genArgs; } var invokeArgs = pars; if (i > 0) { invokeArgs = ", " + invokeArgs; } #> public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action) { if (action == null) throw new ArgumentNullException(nameof(action)); return ToAsync(action, TaskPoolAsyncScheduler.Default); } public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action, IAsyncScheduler scheduler) { if (action == null) throw new ArgumentNullException(nameof(action)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return (<#=pars#>) => { var subject = new SequentialAsyncAsyncSubject(); AsyncObserver.ToAsync(subject, action<#=invokeArgs#>); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled. return subject.AsAsyncObservable(); }; } <# } #> } partial class AsyncObserver { <# for (var i = 0; i <= 16; i++) { var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable" }); var ret = "Func<" + string.Join(", ", args) + ">"; var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })); var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j)); var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">"; var invokePars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j + " arg" + j)); var invokeArgs = pars; if (i > 0) { invokePars = ", " + invokePars; invokeArgs += ", "; } #> public static Task ToAsync<<#=genArgs#>>(IAsyncObserver observer, <#=type#> function<#=invokePars#>) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (function == null) throw new ArgumentNullException(nameof(function)); return ToAsync(observer, function, <#=invokeArgs#>TaskPoolAsyncScheduler.Default); } public static Task ToAsync<<#=genArgs#>>(IAsyncObserver observer, <#=type#> function<#=invokePars#>, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (function == null) throw new ArgumentNullException(nameof(function)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return scheduler.ScheduleAsync(async ct => { TResult res; try { res = function(<#=pars#>); } catch (Exception ex) { await observer.OnErrorAsync(ex).RendezVous(scheduler, ct); return; } await observer.OnNextAsync(res).RendezVous(scheduler, ct); await observer.OnCompletedAsync().RendezVous(scheduler, ct); }); } <# } #> <# for (var i = 0; i <= 16; i++) { var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable" }); var ret = "Func<" + string.Join(", ", args) + ">"; var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j)); var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j)); var type = "Action"; if (genArgs != "") { genArgs = "<" + genArgs + ">"; type += genArgs; } var invokePars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j + " arg" + j)); var invokeArgs = pars; if (i > 0) { invokePars = ", " + invokePars; invokeArgs += ", "; } #> public static Task ToAsync<#=genArgs#>(IAsyncObserver observer, <#=type#> action<#=invokePars#>) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (action == null) throw new ArgumentNullException(nameof(action)); return ToAsync(observer, action, <#=invokeArgs#>TaskPoolAsyncScheduler.Default); } public static Task ToAsync<#=genArgs#>(IAsyncObserver observer, <#=type#> action<#=invokePars#>, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (action == null) throw new ArgumentNullException(nameof(action)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return scheduler.ScheduleAsync(async ct => { try { action(<#=pars#>); } catch (Exception ex) { await observer.OnErrorAsync(ex).RendezVous(scheduler, ct); return; } await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct); await observer.OnCompletedAsync().RendezVous(scheduler, ct); }); } <# } #> } }