// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { public partial class AsyncObservable { // TODO: Add Zip(IAsyncObservable, IAsyncEnumerable) overload when we have reference to IAsyncEnumerable. public static IAsyncObservable> Zip(IEnumerable> sources) => Zip(sources.ToArray()); public static IAsyncObservable> Zip(params IAsyncObservable[] sources) { if (sources == null) throw new ArgumentNullException(nameof(sources)); return Create>(async observer => { var count = sources.Length; var observers = AsyncObserver.Zip(observer, count); var tasks = new Task[count]; for (var i = 0; i < count; i++) { tasks[i] = sources[i].SubscribeSafeAsync(observers[i]).AsTask(); } await Task.WhenAll(tasks).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(tasks.Select(t => t.Result)); }); } } public partial class AsyncObserver { public static IAsyncObserver[] Zip(IAsyncObserver> observer, int count) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); var gate = new AsyncGate(); var queues = new Queue[count]; var isDone = new bool[count]; var res = new IAsyncObserver[count]; IAsyncObserver CreateObserver(int index) => Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { queues[index].Enqueue(x); if (queues.All(queue => queue.Count > 0)) { var list = new TSource[count]; for (var i = 0; i < count; i++) { list[i] = queues[i].Dequeue(); } await observer.OnNextAsync(list).ConfigureAwait(false); } else { var allDone = true; for (var i = 0; i < count; i++) { if (i != index && !isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { isDone[index] = true; var allDone = true; for (var i = 0; i < count; i++) { if (!isDone[i]) { allDone = false; break; } } if (allDone) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ); for (var i = 0; i < count; i++) { queues[i] = new Queue(); res[i] = CreateObserver(i); } return res; } } }