// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. <#@ template debug="false" hostspecific="false" language="C#" #> <#@ assembly name="System.Core" #> <#@ import namespace="System.Linq" #> <#@ import namespace="System.Text" #> <#@ import namespace="System.Collections.Generic" #> <#@ output extension=".cs" #> using System.Collections.Generic; using System.Threading.Tasks; namespace System.Reactive.Joins { <# for (var i = 1; i <= 16; i++) { var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j)); var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IObservable source" + j)); var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j)); var evalPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j + " arg" + j)); #> internal sealed class AsyncPlan<<#=genArgs#>, TResult> : AsyncPlanBase<<#=genArgs#>, TResult> { private readonly Func<<#=genArgs#>, TResult> _selector; internal AsyncPlan(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, TResult> selector) : base(expression) { _selector = selector; } protected override Task EvalAsync(<#=evalPars#>) => Task.FromResult(_selector(<#=pars#>)); } internal sealed class AsyncPlanWithTask<<#=genArgs#>, TResult> : AsyncPlanBase<<#=genArgs#>, TResult> { private readonly Func<<#=genArgs#>, Task> _selector; internal AsyncPlanWithTask(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, Task> selector) : base(expression) { _selector = selector; } protected override Task EvalAsync(<#=evalPars#>) => _selector(<#=pars#>); } internal abstract class AsyncPlanBase<<#=genArgs#>, TResult> : AsyncPlan { private readonly AsyncPattern<<#=genArgs#>> _expression; internal AsyncPlanBase(AsyncPattern<<#=genArgs#>> expression) { _expression = expression; } protected abstract Task EvalAsync(<#=evalPars#>); // REVIEW: Consider the use of ValueTask. internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); <# for (var j = 1; j <= i; j++) { #> var joinObserver<#=j#> = AsyncPlan.CreateObserver>(externalSubscriptions, _expression.Source<#=j#>, onError); <# } #> var activePlan = default(ActiveAsyncPlan<<#=genArgs#>>); activePlan = new ActiveAsyncPlan<<#=genArgs#>>( <# for (var j = 1; j <= i; j++) { #> joinObserver<#=j#>, <# } #> async (<#=pars#>) => { var res = default(TResult); try { res = await EvalAsync(<#=pars#>).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { <# for (var j = 1; j <= i; j++) { #> await joinObserver<#=j#>.RemoveActivePlan(activePlan).ConfigureAwait(false); <# } #> await deactivate(activePlan).ConfigureAwait(false); } ); <# for (var j = 1; j <= i; j++) { #> joinObserver<#=j#>.AddActivePlan(activePlan); <# } #> return activePlan; } } <# } #> }