| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 | 
							- // Licensed to the .NET Foundation under one or more agreements.
 
- // The .NET Foundation licenses this file to you under the MIT License.
 
- // See the LICENSE file in the project root for more information. 
 
- <#@ template debug="false" hostspecific="false" language="C#" #>
 
- <#@ assembly name="System.Core" #>
 
- <#@ import namespace="System.Linq" #>
 
- <#@ import namespace="System.Text" #>
 
- <#@ import namespace="System.Collections.Generic" #>
 
- <#@ output extension=".cs" #>
 
- using System.Reactive.Disposables;
 
- using System.Threading;
 
- using System.Threading.Tasks;
 
- namespace System.Reactive.Linq
 
- {
 
-     public partial class AsyncObservable
 
-     {
 
- <#
 
- for (var i = 2; i <= 15; i++)
 
- {
 
-     var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
 
-     var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
 
-     var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable<T" + j + "> source" + j));
 
-     var obs = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "observer" + j)) + ")";
 
-     var tuple = "(" + genArgs + ")";
 
- #>
 
-         public static IAsyncObservable<<#=tuple#>> CombineLatest<<#=genArgs#>>(this <#=args#>)
 
-         {
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-             if (source<#=j#> == null)
 
-                 throw new ArgumentNullException(nameof(source<#=j#>));
 
- <#
 
- }
 
- #>
 
-             return Create<<#=tuple#>>(async observer =>
 
-             {
 
-                 var d = new CompositeAsyncDisposable();
 
-                 var <#=obs#> = AsyncObserver.CombineLatest(observer);
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-                 var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).AsTask().ContinueWith(disposable => d.AddAsync(disposable.Result).AsTask()).Unwrap();
 
- <#
 
- }
 
- #>
 
-                 await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
 
-                 return d;
 
-             });
 
-         }
 
-         public static IAsyncObservable<TResult> CombineLatest<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, TResult> selector)
 
-         {
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-             if (source<#=j#> == null)
 
-                 throw new ArgumentNullException(nameof(source<#=j#>));
 
- <#
 
- }
 
- #>
 
-             if (selector == null)
 
-                 throw new ArgumentNullException(nameof(selector));
 
-             return Create<TResult>(async observer =>
 
-             {
 
-                 var d = new CompositeAsyncDisposable();
 
-                 var <#=obs#> = AsyncObserver.CombineLatest(observer, selector);
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-                 var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).AsTask().ContinueWith(disposable => d.AddAsync(disposable.Result).AsTask()).Unwrap();
 
- <#
 
- }
 
- #>
 
-                 await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
 
-                 return d;
 
-             });
 
-         }
 
-         public static IAsyncObservable<TResult> CombineLatest<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, ValueTask<TResult>> selector)
 
-         {
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-             if (source<#=j#> == null)
 
-                 throw new ArgumentNullException(nameof(source<#=j#>));
 
- <#
 
- }
 
- #>
 
-             if (selector == null)
 
-                 throw new ArgumentNullException(nameof(selector));
 
-             return Create<TResult>(async observer =>
 
-             {
 
-                 var d = new CompositeAsyncDisposable();
 
-                 var <#=obs#> = AsyncObserver.CombineLatest(observer, selector);
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-                 var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).AsTask().ContinueWith(disposable => d.AddAsync(disposable.Result).AsTask()).Unwrap();
 
- <#
 
- }
 
- #>
 
-                 await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
 
-                 return d;
 
-             });
 
-         }
 
- <#
 
- }
 
- #>
 
-     }
 
-     public partial class AsyncObserver
 
-     {
 
- <#
 
- for (var i = 2; i <= 15; i++)
 
- {
 
-     var res = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObserver<T" + j + ">")) + ")";
 
-     var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
 
-     var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
 
-     var tuple = "(" + genArgs + ")";
 
- #>
 
-         public static <#=res#> CombineLatest<<#=genArgs#>>(IAsyncObserver<<#=tuple#>> observer)
 
-         {
 
-             if (observer == null)
 
-                 throw new ArgumentNullException(nameof(observer));
 
-             bool allHasValue = false;
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-             bool hasValue<#=j#> = false;
 
-             bool isDone<#=j#> = false;
 
-             T<#=j#> latestValue<#=j#> = default(T<#=j#>);
 
- <#
 
- }
 
- #>
 
-             var gate = new AsyncGate();
 
-             return
 
-             (
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-                 Create<T<#=j#>>(
 
-                     async x =>
 
-                     {
 
-                         using (await gate.LockAsync().ConfigureAwait(false))
 
-                         {
 
-                             if (!hasValue<#=j#>)
 
-                             {
 
-                                 hasValue<#=j#> = true;
 
-                                 allHasValue = <#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "hasValue" + k))#>;
 
-                             }
 
-                             latestValue<#=j#> = x;
 
-                             if (allHasValue)
 
-                             {
 
-                                 await observer.OnNextAsync((<#=string.Join(", ", Enumerable.Range(1, i).Select(k => "latestValue" + k))#>)).ConfigureAwait(false);
 
-                             }
 
-                             else if (<#=string.Join(" && ", Enumerable.Range(1, i).Where(k => k != j).Select(k => "isDone" + k))#>)
 
-                             {
 
-                                 await observer.OnCompletedAsync().ConfigureAwait(false);
 
-                             }
 
-                         }
 
-                     },
 
-                     async ex =>
 
-                     {
 
-                         using (await gate.LockAsync().ConfigureAwait(false))
 
-                         {
 
-                             await observer.OnErrorAsync(ex).ConfigureAwait(false);
 
-                         }
 
-                     },
 
-                     async () =>
 
-                     {
 
-                         using (await gate.LockAsync().ConfigureAwait(false))
 
-                         {
 
-                             isDone<#=j#> = true;
 
-                             if (<#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "isDone" + k))#>)
 
-                             {
 
-                                 await observer.OnCompletedAsync().ConfigureAwait(false);
 
-                             }
 
-                         }
 
-                     }
 
-                 )<#=(j < i ? "," : "")#>
 
- <#
 
- }
 
- #>
 
-             );
 
-         }
 
-         public static <#=res#> CombineLatest<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, TResult> selector)
 
-         {
 
-             if (observer == null)
 
-                 throw new ArgumentNullException(nameof(observer));
 
-             if (selector == null)
 
-                 throw new ArgumentNullException(nameof(selector));
 
-             return CombineLatest<<#=genPars#>>(observer, (<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>) => new ValueTask<TResult>(selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>)));
 
-         }
 
-         public static <#=res#> CombineLatest<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, ValueTask<TResult>> selector)
 
-         {
 
-             if (observer == null)
 
-                 throw new ArgumentNullException(nameof(observer));
 
-             if (selector == null)
 
-                 throw new ArgumentNullException(nameof(selector));
 
-             bool allHasValue = false;
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-             bool hasValue<#=j#> = false;
 
-             bool isDone<#=j#> = false;
 
-             T<#=j#> latestValue<#=j#> = default(T<#=j#>);
 
- <#
 
- }
 
- #>
 
-             var gate = new AsyncGate();
 
-             return
 
-             (
 
- <#
 
- for (var j = 1; j <= i; j++)
 
- {
 
- #>
 
-                 Create<T<#=j#>>(
 
-                     async x =>
 
-                     {
 
-                         using (await gate.LockAsync().ConfigureAwait(false))
 
-                         {
 
-                             if (!hasValue<#=j#>)
 
-                             {
 
-                                 hasValue<#=j#> = true;
 
-                                 allHasValue = <#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "hasValue" + k))#>;
 
-                             }
 
-                             latestValue<#=j#> = x;
 
-                             if (allHasValue)
 
-                             {
 
-                                 TResult res;
 
-                                 try
 
-                                 {
 
-                                     res = await selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(k => "latestValue" + k))#>).ConfigureAwait(false);
 
-                                 }
 
-                                 catch (Exception ex)
 
-                                 {
 
-                                     await observer.OnErrorAsync(ex).ConfigureAwait(false);
 
-                                     return;
 
-                                 }
 
-                                 await observer.OnNextAsync(res).ConfigureAwait(false);
 
-                             }
 
-                             else if (<#=string.Join(" && ", Enumerable.Range(1, i).Where(k => k != j).Select(k => "isDone" + k))#>)
 
-                             {
 
-                                 await observer.OnCompletedAsync().ConfigureAwait(false);
 
-                             }
 
-                         }
 
-                     },
 
-                     async ex =>
 
-                     {
 
-                         using (await gate.LockAsync().ConfigureAwait(false))
 
-                         {
 
-                             await observer.OnErrorAsync(ex).ConfigureAwait(false);
 
-                         }
 
-                     },
 
-                     async () =>
 
-                     {
 
-                         using (await gate.LockAsync().ConfigureAwait(false))
 
-                         {
 
-                             isDone<#=j#> = true;
 
-                             if (<#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "isDone" + k))#>)
 
-                             {
 
-                                 await observer.OnCompletedAsync().ConfigureAwait(false);
 
-                             }
 
-                         }
 
-                     }
 
-                 )<#=(j < i ? "," : "")#>
 
- <#
 
- }
 
- #>
 
-             );
 
-         }
 
- <#
 
- }
 
- #>
 
-     }
 
- }
 
 
  |