// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. <#@ template debug="false" hostspecific="false" language="C#" #> <#@ assembly name="System.Core" #> <#@ import namespace="System.Linq" #> <#@ import namespace="System.Text" #> <#@ import namespace="System.Collections.Generic" #> <#@ output extension=".cs" #> using System.Reactive.Concurrency; using System.Reactive.Subjects; namespace System.Reactive.Linq { // REVIEW: Consider if these are worth retaining in the async space. partial class AsyncObservable { <# for (var i = 0; i <= 16; i++) { var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable" }); var ret = "Func<" + string.Join(", ", args) + ">"; var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })); var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j)); var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">"; #> public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function) { if (function == null) throw new ArgumentNullException(nameof(function)); return ToAsync(function, TaskPoolAsyncScheduler.Default); } public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function, IAsyncScheduler scheduler) { if (function == null) throw new ArgumentNullException(nameof(function)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return (<#=pars#>) => { var subject = new SequentialAsyncAsyncSubject(); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled. scheduler.ScheduleAsync(async ct => { TResult res; try { res = function(<#=pars#>); } catch (Exception ex) { await subject.OnErrorAsync(ex).RendezVous(scheduler); return; } await subject.OnNextAsync(res).RendezVous(scheduler); await subject.OnCompletedAsync().RendezVous(scheduler); }); return subject.AsAsyncObservable(); }; } <# } #> <# for (var i = 0; i <= 16; i++) { var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable" }); var ret = "Func<" + string.Join(", ", args) + ">"; var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j)); var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j)); var type = "Action"; if (genArgs != "") { genArgs = "<" + genArgs + ">"; type += genArgs; } #> public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action) { if (action == null) throw new ArgumentNullException(nameof(action)); return ToAsync(action, TaskPoolAsyncScheduler.Default); } public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action, IAsyncScheduler scheduler) { if (action == null) throw new ArgumentNullException(nameof(action)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return (<#=pars#>) => { var subject = new SequentialAsyncAsyncSubject(); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled. scheduler.ScheduleAsync(async ct => { try { action(<#=pars#>); } catch (Exception ex) { await subject.OnErrorAsync(ex).RendezVous(scheduler); return; } await subject.OnNextAsync(Unit.Default).RendezVous(scheduler); await subject.OnCompletedAsync().RendezVous(scheduler); }); return subject.AsAsyncObservable(); }; } <# } #> } }