// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. <#@ template debug="false" hostspecific="false" language="C#" #> <#@ assembly name="System.Core" #> <#@ import namespace="System.Linq" #> <#@ import namespace="System.Text" #> <#@ import namespace="System.Collections.Generic" #> <#@ output extension=".cs" #> using System.Collections.Generic; using System.Threading.Tasks; namespace System.Reactive.Joins { <# for (var i = 1; i <= 16; i++) { var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j)); var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IObservable source" + j)); var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j)); #> public class AsyncPlan<<#=genArgs#>, TResult> : AsyncPlan { public AsyncPattern<<#=genArgs#>> Expression { get; } public Func<<#=genArgs#>, TResult> Selector { get; } internal AsyncPlan(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, TResult> selector) { Expression = expression; Selector = selector; } internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); <# for (var j = 1; j <= i; j++) { #> var joinObserver<#=j#> = AsyncPlan.CreateObserver>(externalSubscriptions, this.Expression.Source<#=j#>, onError); <# } #> var activePlan = default(ActiveAsyncPlan<<#=genArgs#>>); activePlan = new ActiveAsyncPlan<<#=genArgs#>>( <# for (var j = 1; j <= i; j++) { #> joinObserver<#=j#>, <# } #> async (<#=pars#>) => { var res = default(TResult); try { res = Selector(<#=pars#>); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { <# for (var j = 1; j <= i; j++) { #> await joinObserver<#=j#>.RemoveActivePlan(activePlan).ConfigureAwait(false); <# } #> await deactivate(activePlan).ConfigureAwait(false); } ); <# for (var j = 1; j <= i; j++) { #> joinObserver<#=j#>.AddActivePlan(activePlan); <# } #> return activePlan; } } <# } #> }