// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. <#@ template debug="false" hostspecific="false" language="C#" #> <#@ assembly name="System.Core" #> <#@ import namespace="System.Linq" #> <#@ import namespace="System.Text" #> <#@ import namespace="System.Collections.Generic" #> <#@ output extension=".cs" #> using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { partial class AsyncObservable { <# for (var i = 2; i <= 15; i++) { var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })); var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j)); var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable source" + j)); var obs = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "observer" + j)) + ")"; var tuple = "(" + genArgs + ")"; #> public static IAsyncObservable<<#=tuple#>> CombineLatest<<#=genArgs#>>(this <#=args#>) { <# for (var j = 1; j <= i; j++) { #> if (source<#=j#> == null) throw new ArgumentNullException(nameof(source<#=j#>)); <# } #> return Create<<#=tuple#>>(async observer => { var d = new CompositeAsyncDisposable(); var <#=obs#> = AsyncObserver.CombineLatest(observer); <# for (var j = 1; j <= i; j++) { #> var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); <# } #> await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false); return d; }); } public static IAsyncObservable CombineLatest<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, TResult> selector) { <# for (var j = 1; j <= i; j++) { #> if (source<#=j#> == null) throw new ArgumentNullException(nameof(source<#=j#>)); <# } #> if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var <#=obs#> = AsyncObserver.CombineLatest(observer, selector); <# for (var j = 1; j <= i; j++) { #> var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); <# } #> await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false); return d; }); } public static IAsyncObservable CombineLatest<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, Task> selector) { <# for (var j = 1; j <= i; j++) { #> if (source<#=j#> == null) throw new ArgumentNullException(nameof(source<#=j#>)); <# } #> if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var <#=obs#> = AsyncObserver.CombineLatest(observer, selector); <# for (var j = 1; j <= i; j++) { #> var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); <# } #> await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false); return d; }); } <# } #> } partial class AsyncObserver { <# for (var i = 2; i <= 15; i++) { var res = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObserver")) + ")"; var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })); var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j)); var tuple = "(" + genArgs + ")"; #> public static <#=res#> CombineLatest<<#=genArgs#>>(IAsyncObserver<<#=tuple#>> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); bool allHasValue = false; <# for (var j = 1; j <= i; j++) { #> bool hasValue<#=j#> = false; bool isDone<#=j#> = false; T<#=j#> latestValue<#=j#> = default(T<#=j#>); <# } #> var gate = new AsyncLock(); return ( <# for (var j = 1; j <= i; j++) { #> Create>( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { if (!hasValue<#=j#>) { hasValue<#=j#> = true; allHasValue = <#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "hasValue" + k))#>; } latestValue<#=j#> = x; if (allHasValue) { await observer.OnNextAsync((<#=string.Join(", ", Enumerable.Range(1, i).Select(k => "latestValue" + k))#>)).ConfigureAwait(false); } else if (<#=string.Join(" && ", Enumerable.Range(1, i).Where(k => k != j).Select(k => "isDone" + k))#>) { await observer.OnCompletedAsync().ConfigureAwait(false); } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone<#=j#> = true; if (<#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "isDone" + k))#>) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } )<#=(j < i ? "," : "")#> <# } #> ); } public static <#=res#> CombineLatest<<#=genPars#>>(IAsyncObserver observer, Func<<#=genArgs#>, TResult> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return CombineLatest<<#=genPars#>>(observer, (<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>) => Task.FromResult(selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>))); } public static <#=res#> CombineLatest<<#=genPars#>>(IAsyncObserver observer, Func<<#=genArgs#>, Task> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); bool allHasValue = false; <# for (var j = 1; j <= i; j++) { #> bool hasValue<#=j#> = false; bool isDone<#=j#> = false; T<#=j#> latestValue<#=j#> = default(T<#=j#>); <# } #> var gate = new AsyncLock(); return ( <# for (var j = 1; j <= i; j++) { #> Create>( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { if (!hasValue<#=j#>) { hasValue<#=j#> = true; allHasValue = <#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "hasValue" + k))#>; } latestValue<#=j#> = x; if (allHasValue) { TResult res; try { res = await selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(k => "latestValue" + k))#>).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else if (<#=string.Join(" && ", Enumerable.Range(1, i).Where(k => k != j).Select(k => "isDone" + k))#>) { await observer.OnCompletedAsync().ConfigureAwait(false); } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone<#=j#> = true; if (<#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "isDone" + k))#>) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } )<#=(j < i ? "," : "")#> <# } #> ); } <# } #> } }