// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { public partial class AsyncObservable { public static IAsyncObservable SelectMany(this IAsyncObservable source, Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return CreateAsyncObservable.From( source, selector, static async (source, selector, observer) => { var (sink, inner) = AsyncObserver.SelectMany(observer, selector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func>> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return CreateAsyncObservable.From( source, selector, static async (source, selector, observer) => { var (sink, inner) = AsyncObserver.SelectMany(observer, selector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func> collectionSelector, Func resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return CreateAsyncObservable.From( source, (collectionSelector, resultSelector), static async (source, state, observer) => { var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func>> collectionSelector, Func> resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return CreateAsyncObservable.From( source, (collectionSelector, resultSelector), static async (source, state, observer) => { var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return CreateAsyncObservable.From( source, selector, static async (source, selector, observer) => { var (sink, inner) = AsyncObserver.SelectMany(observer, selector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func>> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return CreateAsyncObservable.From( source, selector, static async (source, selector, observer) => { var (sink, inner) = AsyncObserver.SelectMany(observer, selector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func> collectionSelector, Func resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return CreateAsyncObservable.From( source, (collectionSelector, resultSelector), static async (source, state, observer) => { var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } public static IAsyncObservable SelectMany(this IAsyncObservable source, Func>> collectionSelector, Func> resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return CreateAsyncObservable.From( source, (collectionSelector, resultSelector), static async (source, state, observer) => { var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector); var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(subscription, inner); }); } } public partial class AsyncObserver { public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return SelectMany(observer, x => new ValueTask>(selector(x)), (x, y) => new ValueTask(y)); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func>> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return SelectMany(observer, selector, (x, y) => new ValueTask(y)); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func> collectionSelector, Func resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return SelectMany(observer, x => new ValueTask>(collectionSelector(x)), (x, y) => new ValueTask(resultSelector(x, y))); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func>> collectionSelector, Func> resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); var gate = new AsyncGate(); var count = 1; var disposable = new CompositeAsyncDisposable(); async ValueTask OnErrorAsync(Exception ex) { using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }; async ValueTask OnCompletedAsync() { using (await gate.LockAsync().ConfigureAwait(false)) { if (--count == 0) { await observer.OnCompletedAsync().ConfigureAwait(false); } } }; return ( Create( async x => { var collection = default(IAsyncObservable); try { collection = await collectionSelector(x).ConfigureAwait(false); } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } using (await gate.LockAsync().ConfigureAwait(false)) { count++; } var inner = new SingleAssignmentAsyncDisposable(); await disposable.AddAsync(inner).ConfigureAwait(false); var innerObserver = Create( async y => { var res = default(TResult); try { res = await resultSelector(x, y).ConfigureAwait(false); } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnNextAsync(res).ConfigureAwait(false); } }, OnErrorAsync, async () => { await OnCompletedAsync().ConfigureAwait(false); await disposable.RemoveAsync(inner).ConfigureAwait(false); } ); var innerSubscription = await collection.SubscribeSafeAsync(innerObserver).ConfigureAwait(false); await inner.AssignAsync(innerSubscription).ConfigureAwait(false); }, OnErrorAsync, OnCompletedAsync ), disposable ); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return SelectMany(observer, (x, i) => new ValueTask>(selector(x, i)), (x, i, y, j) => new ValueTask(y)); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func>> selector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return SelectMany(observer, selector, (x, i, y, j) => new ValueTask(y)); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func> collectionSelector, Func resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return SelectMany(observer, (x, i) => new ValueTask>(collectionSelector(x, i)), (x, i, y, j) => new ValueTask(resultSelector(x, i, y, j))); } public static (IAsyncObserver, IAsyncDisposable) SelectMany(IAsyncObserver observer, Func>> collectionSelector, Func> resultSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (collectionSelector == null) throw new ArgumentNullException(nameof(collectionSelector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); async ValueTask> collectionSelectorWithIndex((TSource item, int i) t) => (await collectionSelector(t.item, t.i).ConfigureAwait(false)).Select((item, i) => (item, i)); ValueTask resultSelectorWithIndex((TSource item, int i) outer, (TCollection item, int i) inner) => resultSelector(outer.item, outer.i, inner.item, inner.i); var (outerObserverWithIndex, disposable) = SelectMany(observer, collectionSelectorWithIndex, (Func<(TSource item, int i), (TCollection item, int i), ValueTask>)resultSelectorWithIndex); var outerObserver = Select(outerObserverWithIndex, (item, i) => (item, i)); return (outerObserver, disposable); } } }