// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { partial class AsyncObservable { public static IAsyncObservable SelectMany(this IAsyncObservable source, Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var (sink, inner) = AsyncObserver.SelectMany(observer, selector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func>> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var (sink, inner) = AsyncObserver.SelectMany(observer, selector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func> collectionSelector, Func resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return Create(async observer => { var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func>> collectionSelector, Func> resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return Create(async observer => { var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var (sink, inner) = AsyncObserver.SelectMany(observer, selector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func>> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var (sink, inner) = AsyncObserver.SelectMany(observer, selector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func> collectionSelector, Func resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return Create(async observer => { var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func>> collectionSelector, Func> resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return Create(async observer => { var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } } partial class AsyncObserver { public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return SelectMany(observer, x => Task.FromResult(selector(x)), (x, y) => Task.FromResult(y)); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func>> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return SelectMany(observer, selector, (x, y) => Task.FromResult(y)); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func> collectionSelector, Func resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return SelectMany(observer, x => Task.FromResult(collectionSelector(x)), (x, y) => Task.FromResult(resultSelector(x, y))); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func>> collectionSelector, Func> resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); var gate = new AsyncLock(); var count = 1; var disposable = new CompositeAsyncDisposable(); async Task OnErrorAsync(Exception ex) { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }; async Task OnCompletedAsync() { using (await gate.LockAsync().ConfigureAwait(false)) { if (--count == 0) { await observer.OnCompletedAsync().ConfigureAwait(false); } } }; return ( Create( async x => { var collection = default(IAsyncObservable); try { collection = await collectionSelector(x).ConfigureAwait(false); } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } using (await gate.LockAsync().ConfigureAwait(false)) { count++; } var inner = new SingleAssignmentAsyncDisposable(); await disposable.AddAsync(inner).ConfigureAwait(false); var innerObserver = Create( async y => { var res = default(TResult); try { res = await resultSelector(x, y).ConfigureAwait(false); } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnNextAsync(res).ConfigureAwait(false); } }, OnErrorAsync, async () => { await OnCompletedAsync().ConfigureAwait(false); await disposable.RemoveAsync(inner).ConfigureAwait(false); } ); var innerSubscription = await collection.SubscribeSafeAsync(innerObserver).ConfigureAwait(false); await inner.AssignAsync(innerSubscription).ConfigureAwait(false); }, OnErrorAsync, OnCompletedAsync ), disposable ); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return SelectMany(observer, (x, i) => Task.FromResult(selector(x, i)), (x, i, y, j) => Task.FromResult(y)); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func>> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return SelectMany(observer, selector, (x, i, y, j) => Task.FromResult(y)); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func> collectionSelector, Func resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return SelectMany(observer, (x, i) => Task.FromResult(collectionSelector(x, i)), (x, i, y, j) => Task.FromResult(resultSelector(x, i, y, j))); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func>> collectionSelector, Func> resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); Func<(TSource item, int i), Task>> collectionSelectorWithIndex = async t => (await collectionSelector(t.item, t.i).ConfigureAwait(false)).Select((item, i) => (item, i)); Func<(TSource item, int i), (TCollection item, int i), Task> resultSelectorWithIndex = (outer, inner) => resultSelector(outer.item, outer.i, inner.item, inner.i); var (outerObserverWithIndex, disposable) = SelectMany(observer, collectionSelectorWithIndex, resultSelectorWithIndex); var outerObserver = Select(outerObserverWithIndex, (item, i) => (item, i)); return (outerObserver, disposable); } } }