// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. <#@ template debug="false" hostspecific="false" language="C#" #> <#@ assembly name="System.Core" #> <#@ import namespace="System.Linq" #> <#@ import namespace="System.Text" #> <#@ import namespace="System.Collections.Generic" #> <#@ output extension=".cs" #> using System.Threading.Tasks; namespace System.Reactive.Joins { <# for (var i = 1; i <= 16; i++) { var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j)); var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable source" + j)); var observers = string.Join(", ", Enumerable.Range(1, i).Select(j => "AsyncJoinObserver observer" + j)); #> internal sealed class ActiveAsyncPlan<<#=genArgs#>> : ActiveAsyncPlan { private readonly Func<<#=genArgs#>, Task> _onNext; private readonly Func _onCompleted; <# for (var j = 1; j <= i; j++) { #> private readonly AsyncJoinObserver> _observer<#=j#>; <# } #> internal ActiveAsyncPlan(<#=observers#>, Func<<#=genArgs#>, Task> onNext, Func onCompleted) { _onNext = onNext; _onCompleted = onCompleted; <# for (var j = 1; j <= i; j++) { #> _observer<#=j#> = observer<#=j#>; <# } #> <# for (var j = 1; j <= i; j++) { #> AddJoinObserver(observer<#=j#>); <# } #> } internal override Task Match() { if (<#=string.Join(" && ", Enumerable.Range(1, i).Select(j => "_observer" + j + ".Queue.Count > 0"))#>) { <# for (var j = 1; j <= i; j++) { #> var notification<#=j#> = _observer<#=j#>.Queue.Peek(); <# } #> if (<#=string.Join(" || ", Enumerable.Range(1, i).Select(j => "notification" + j + ".Kind == NotificationKind.OnCompleted"))#>) { return _onCompleted(); } Dequeue(); return _onNext(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "notification" + j + ".Value"))#>); } return Task.CompletedTask; } } <# } #> }