| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. <#@ template debug="false" hostspecific="false" language="C#" #><#@ assembly name="System.Core" #><#@ import namespace="System.Linq" #><#@ import namespace="System.Text" #><#@ import namespace="System.Collections.Generic" #><#@ output extension=".cs" #>using System.Collections.Generic;using System.Threading.Tasks;namespace System.Reactive.Joins{<#for (var i = 1; i <= 16; i++){    var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j));    var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IObservable<TSource" + j + "> source" + j));    var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));    var evalPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j + " arg" + j));#>    internal sealed class AsyncPlan<<#=genArgs#>, TResult> : AsyncPlanBase<<#=genArgs#>, TResult>    {        private readonly Func<<#=genArgs#>, TResult> _selector;        internal AsyncPlan(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, TResult> selector)            : base(expression)        {            _selector = selector;        }        protected override Task<TResult> EvalAsync(<#=evalPars#>) => Task.FromResult(_selector(<#=pars#>));    }    internal sealed class AsyncPlanWithTask<<#=genArgs#>, TResult> : AsyncPlanBase<<#=genArgs#>, TResult>    {        private readonly Func<<#=genArgs#>, Task<TResult>> _selector;        internal AsyncPlanWithTask(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, Task<TResult>> selector)            : base(expression)        {            _selector = selector;        }        protected override Task<TResult> EvalAsync(<#=evalPars#>) => _selector(<#=pars#>);    }    internal abstract class AsyncPlanBase<<#=genArgs#>, TResult> : AsyncPlan<TResult>    {        private readonly AsyncPattern<<#=genArgs#>> _expression;        internal AsyncPlanBase(AsyncPattern<<#=genArgs#>> expression)        {            _expression = expression;        }        protected abstract Task<TResult> EvalAsync(<#=evalPars#>); // REVIEW: Consider the use of ValueTask<TResult>.        internal override ActiveAsyncPlan Activate(Dictionary<object, IAsyncJoinObserver> externalSubscriptions, IAsyncObserver<TResult> observer, Func<ActiveAsyncPlan, Task> deactivate)        {            var onError = new Func<Exception, Task>(observer.OnErrorAsync);<#for (var j = 1; j <= i; j++){#>            var joinObserver<#=j#> = AsyncPlan<TResult>.CreateObserver<TSource<#=j#>>(externalSubscriptions, _expression.Source<#=j#>, onError);<#}#>            var activePlan = default(ActiveAsyncPlan<<#=genArgs#>>);            activePlan = new ActiveAsyncPlan<<#=genArgs#>>(<#for (var j = 1; j <= i; j++){#>                joinObserver<#=j#>,<#}#>                async (<#=pars#>) =>                {                    var res = default(TResult);                    try                    {                        res = await EvalAsync(<#=pars#>).ConfigureAwait(false);                    }                    catch (Exception ex)                    {                        await observer.OnErrorAsync(ex).ConfigureAwait(false);                        return;                    }                    await observer.OnNextAsync(res).ConfigureAwait(false);                },                async () =>                {<#for (var j = 1; j <= i; j++){#>                    await joinObserver<#=j#>.RemoveActivePlan(activePlan).ConfigureAwait(false);<#}#>                    await deactivate(activePlan).ConfigureAwait(false);                }            );<#for (var j = 1; j <= i; j++){#>            joinObserver<#=j#>.AddActivePlan(activePlan);<#}#>            return activePlan;        }    }<#}#>}
 |