// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { partial class AsyncObservable { public static IAsyncObservable<(T1, T2)> Zip(this IAsyncObservable source1, IAsyncObservable source2) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); return Create<(T1, T2)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); return Create<(T1, T2, T3)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); return Create<(T1, T2, T3, T4)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); return Create<(T1, T2, T3, T4, T5)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); return Create<(T1, T2, T3, T4, T5, T6)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6, T7)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); return Create<(T1, T2, T3, T4, T5, T6, T7)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6, T7, T8)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); return Create<(T1, T2, T3, T4, T5, T6, T7, T8)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6, T7, T8, T9)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); return Create<(T1, T2, T3, T4, T5, T6, T7, T8, T9)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); return Create<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); return Create<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); return Create<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, IAsyncObservable source13) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (source13 == null) throw new ArgumentNullException(nameof(source13)); return Create<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12, observer13) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub13 = source13.SubscribeSafeAsync(observer13).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12, sub13).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, IAsyncObservable source13, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (source13 == null) throw new ArgumentNullException(nameof(source13)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12, observer13) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub13 = source13.SubscribeSafeAsync(observer13).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12, sub13).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, IAsyncObservable source13, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (source13 == null) throw new ArgumentNullException(nameof(source13)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12, observer13) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub13 = source13.SubscribeSafeAsync(observer13).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12, sub13).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, IAsyncObservable source13, IAsyncObservable source14) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (source13 == null) throw new ArgumentNullException(nameof(source13)); if (source14 == null) throw new ArgumentNullException(nameof(source14)); return Create<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12, observer13, observer14) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub13 = source13.SubscribeSafeAsync(observer13).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub14 = source14.SubscribeSafeAsync(observer14).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12, sub13, sub14).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, IAsyncObservable source13, IAsyncObservable source14, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (source13 == null) throw new ArgumentNullException(nameof(source13)); if (source14 == null) throw new ArgumentNullException(nameof(source14)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12, observer13, observer14) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub13 = source13.SubscribeSafeAsync(observer13).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub14 = source14.SubscribeSafeAsync(observer14).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12, sub13, sub14).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, IAsyncObservable source13, IAsyncObservable source14, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (source13 == null) throw new ArgumentNullException(nameof(source13)); if (source14 == null) throw new ArgumentNullException(nameof(source14)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12, observer13, observer14) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub13 = source13.SubscribeSafeAsync(observer13).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub14 = source14.SubscribeSafeAsync(observer14).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12, sub13, sub14).ConfigureAwait(false); return d; }); } public static IAsyncObservable<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15)> Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, IAsyncObservable source13, IAsyncObservable source14, IAsyncObservable source15) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (source13 == null) throw new ArgumentNullException(nameof(source13)); if (source14 == null) throw new ArgumentNullException(nameof(source14)); if (source15 == null) throw new ArgumentNullException(nameof(source15)); return Create<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15)>(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12, observer13, observer14, observer15) = AsyncObserver.Zip(observer); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub13 = source13.SubscribeSafeAsync(observer13).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub14 = source14.SubscribeSafeAsync(observer14).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub15 = source15.SubscribeSafeAsync(observer15).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12, sub13, sub14, sub15).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, IAsyncObservable source13, IAsyncObservable source14, IAsyncObservable source15, Func selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (source13 == null) throw new ArgumentNullException(nameof(source13)); if (source14 == null) throw new ArgumentNullException(nameof(source14)); if (source15 == null) throw new ArgumentNullException(nameof(source15)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12, observer13, observer14, observer15) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub13 = source13.SubscribeSafeAsync(observer13).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub14 = source14.SubscribeSafeAsync(observer14).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub15 = source15.SubscribeSafeAsync(observer15).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12, sub13, sub14, sub15).ConfigureAwait(false); return d; }); } public static IAsyncObservable Zip(this IAsyncObservable source1, IAsyncObservable source2, IAsyncObservable source3, IAsyncObservable source4, IAsyncObservable source5, IAsyncObservable source6, IAsyncObservable source7, IAsyncObservable source8, IAsyncObservable source9, IAsyncObservable source10, IAsyncObservable source11, IAsyncObservable source12, IAsyncObservable source13, IAsyncObservable source14, IAsyncObservable source15, Func> selector) { if (source1 == null) throw new ArgumentNullException(nameof(source1)); if (source2 == null) throw new ArgumentNullException(nameof(source2)); if (source3 == null) throw new ArgumentNullException(nameof(source3)); if (source4 == null) throw new ArgumentNullException(nameof(source4)); if (source5 == null) throw new ArgumentNullException(nameof(source5)); if (source6 == null) throw new ArgumentNullException(nameof(source6)); if (source7 == null) throw new ArgumentNullException(nameof(source7)); if (source8 == null) throw new ArgumentNullException(nameof(source8)); if (source9 == null) throw new ArgumentNullException(nameof(source9)); if (source10 == null) throw new ArgumentNullException(nameof(source10)); if (source11 == null) throw new ArgumentNullException(nameof(source11)); if (source12 == null) throw new ArgumentNullException(nameof(source12)); if (source13 == null) throw new ArgumentNullException(nameof(source13)); if (source14 == null) throw new ArgumentNullException(nameof(source14)); if (source15 == null) throw new ArgumentNullException(nameof(source15)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var d = new CompositeAsyncDisposable(); var (observer1, observer2, observer3, observer4, observer5, observer6, observer7, observer8, observer9, observer10, observer11, observer12, observer13, observer14, observer15) = AsyncObserver.Zip(observer, selector); var sub1 = source1.SubscribeSafeAsync(observer1).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub2 = source2.SubscribeSafeAsync(observer2).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub3 = source3.SubscribeSafeAsync(observer3).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub4 = source4.SubscribeSafeAsync(observer4).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub5 = source5.SubscribeSafeAsync(observer5).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub6 = source6.SubscribeSafeAsync(observer6).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub7 = source7.SubscribeSafeAsync(observer7).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub8 = source8.SubscribeSafeAsync(observer8).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub9 = source9.SubscribeSafeAsync(observer9).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub10 = source10.SubscribeSafeAsync(observer10).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub11 = source11.SubscribeSafeAsync(observer11).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub12 = source12.SubscribeSafeAsync(observer12).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub13 = source13.SubscribeSafeAsync(observer13).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub14 = source14.SubscribeSafeAsync(observer14).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); var sub15 = source15.SubscribeSafeAsync(observer15).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap(); await Task.WhenAll(sub1, sub2, sub3, sub4, sub5, sub6, sub7, sub8, sub9, sub10, sub11, sub12, sub13, sub14, sub15).ConfigureAwait(false); return d; }); } } partial class AsyncObserver { public static (IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var isDone = new bool[2]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 2; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 2; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2) ); } public static (IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2) => Task.FromResult(selector(x1, x2))); } public static (IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var isDone = new bool[2]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 2; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 2; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var isDone = new bool[3]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 3; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 3; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3) => Task.FromResult(selector(x1, x2, x3))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var isDone = new bool[3]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 3; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 3; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var isDone = new bool[4]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 4; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 4; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4) => Task.FromResult(selector(x1, x2, x3, x4))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var isDone = new bool[4]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 4; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 4; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var isDone = new bool[5]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 5; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 5; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5) => Task.FromResult(selector(x1, x2, x3, x4, x5))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var isDone = new bool[5]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 5; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 5; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var isDone = new bool[6]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 6; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 6; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var isDone = new bool[6]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 6; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 6; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6, T7)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var isDone = new bool[7]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 7; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 7; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6, x7) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6, x7))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var isDone = new bool[7]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 7; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 7; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6, T7, T8)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var isDone = new bool[8]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 8; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 8; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6, x7, x8) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6, x7, x8))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var isDone = new bool[8]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 8; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 8; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6, T7, T8, T9)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var isDone = new bool[9]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 9; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 9; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6, x7, x8, x9) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6, x7, x8, x9))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var isDone = new bool[9]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 9; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 9; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var isDone = new bool[10]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 10; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 10; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6, x7, x8, x9, x10) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6, x7, x8, x9, x10))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var isDone = new bool[10]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 10; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 10; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var isDone = new bool[11]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 11; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 11; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var isDone = new bool[11]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 11; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 11; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var values12 = new Queue(); var isDone = new bool[12]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0 && values12.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue(), values12.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 12; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 12; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11), CreateObserver(12, values12) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var values12 = new Queue(); var isDone = new bool[12]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0 && values12.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue(), values12.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 12; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 12; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11), CreateObserver(12, values12) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var values12 = new Queue(); var values13 = new Queue(); var isDone = new bool[13]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0 && values12.Count > 0 && values13.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue(), values12.Dequeue(), values13.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 13; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 13; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11), CreateObserver(12, values12), CreateObserver(13, values13) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12, x13) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12, x13))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var values12 = new Queue(); var values13 = new Queue(); var isDone = new bool[13]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0 && values12.Count > 0 && values13.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue(), values12.Dequeue(), values13.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 13; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 13; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11), CreateObserver(12, values12), CreateObserver(13, values13) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var values12 = new Queue(); var values13 = new Queue(); var values14 = new Queue(); var isDone = new bool[14]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0 && values12.Count > 0 && values13.Count > 0 && values14.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue(), values12.Dequeue(), values13.Dequeue(), values14.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 14; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 14; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11), CreateObserver(12, values12), CreateObserver(13, values13), CreateObserver(14, values14) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12, x13, x14) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12, x13, x14))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var values12 = new Queue(); var values13 = new Queue(); var values14 = new Queue(); var isDone = new bool[14]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0 && values12.Count > 0 && values13.Count > 0 && values14.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue(), values12.Dequeue(), values13.Dequeue(), values14.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 14; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 14; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11), CreateObserver(12, values12), CreateObserver(13, values13), CreateObserver(14, values14) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver<(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15)> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var values12 = new Queue(); var values13 = new Queue(); var values14 = new Queue(); var values15 = new Queue(); var isDone = new bool[15]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0 && values12.Count > 0 && values13.Count > 0 && values14.Count > 0 && values15.Count > 0) { await observer.OnNextAsync((values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue(), values12.Dequeue(), values13.Dequeue(), values14.Dequeue(), values15.Dequeue())).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 15; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 15; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11), CreateObserver(12, values12), CreateObserver(13, values13), CreateObserver(14, values14), CreateObserver(15, values15) ); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Zip(observer, (x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12, x13, x14, x15) => Task.FromResult(selector(x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12, x13, x14, x15))); } public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); var gate = new AsyncLock(); var values1 = new Queue(); var values2 = new Queue(); var values3 = new Queue(); var values4 = new Queue(); var values5 = new Queue(); var values6 = new Queue(); var values7 = new Queue(); var values8 = new Queue(); var values9 = new Queue(); var values10 = new Queue(); var values11 = new Queue(); var values12 = new Queue(); var values13 = new Queue(); var values14 = new Queue(); var values15 = new Queue(); var isDone = new bool[15]; IAsyncObserver CreateObserver(int index, Queue queue) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queue.Enqueue(x); if (values1.Count > 0 && values2.Count > 0 && values3.Count > 0 && values4.Count > 0 && values5.Count > 0 && values6.Count > 0 && values7.Count > 0 && values8.Count > 0 && values9.Count > 0 && values10.Count > 0 && values11.Count > 0 && values12.Count > 0 && values13.Count > 0 && values14.Count > 0 && values15.Count > 0) { TResult res; try { res = await selector(values1.Dequeue(), values2.Dequeue(), values3.Dequeue(), values4.Dequeue(), values5.Dequeue(), values6.Dequeue(), values7.Dequeue(), values8.Dequeue(), values9.Dequeue(), values10.Dequeue(), values11.Dequeue(), values12.Dequeue(), values13.Dequeue(), values14.Dequeue(), values15.Dequeue()).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < 15; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < 15; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); return ( CreateObserver(1, values1), CreateObserver(2, values2), CreateObserver(3, values3), CreateObserver(4, values4), CreateObserver(5, values5), CreateObserver(6, values6), CreateObserver(7, values7), CreateObserver(8, values8), CreateObserver(9, values9), CreateObserver(10, values10), CreateObserver(11, values11), CreateObserver(12, values12), CreateObserver(13, values13), CreateObserver(14, values14), CreateObserver(15, values15) ); } } }