| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the MIT License.// See the LICENSE file in the project root for more information. using System.Reactive.Disposables;using System.Threading;using System.Threading.Tasks;namespace System.Reactive.Linq{    public partial class AsyncObservable    {        public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, IAsyncObservable<TResult>> selector)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (selector == null)                throw new ArgumentNullException(nameof(selector));            return CreateAsyncObservable<TResult>.From(                source,                selector,                static async (source, selector, observer) =>                {                    var (sink, inner) = AsyncObserver.SelectMany(observer, selector);                    var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(subscription, inner);                });        }        public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<IAsyncObservable<TResult>>> selector)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (selector == null)                throw new ArgumentNullException(nameof(selector));            return CreateAsyncObservable<TResult>.From(                source,                selector,                static async (source, selector, observer) =>                {                    var (sink, inner) = AsyncObserver.SelectMany(observer, selector);                    var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(subscription, inner);                });        }        public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (collectionSelector == null)                throw new ArgumentNullException(nameof(collectionSelector));            if (resultSelector == null)                throw new ArgumentNullException(nameof(resultSelector));            return CreateAsyncObservable<TResult>.From(                source,                (collectionSelector, resultSelector),                static async (source, state, observer) =>                {                    var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector);                    var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(subscription, inner);                });        }        public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (collectionSelector == null)                throw new ArgumentNullException(nameof(collectionSelector));            if (resultSelector == null)                throw new ArgumentNullException(nameof(resultSelector));            return CreateAsyncObservable<TResult>.From(                source,                (collectionSelector, resultSelector),                static async (source, state, observer) =>                {                    var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector);                    var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(subscription, inner);                });        }        public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, IAsyncObservable<TResult>> selector)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (selector == null)                throw new ArgumentNullException(nameof(selector));            return CreateAsyncObservable<TResult>.From(                source,                selector,                static async (source, selector, observer) =>                {                    var (sink, inner) = AsyncObserver.SelectMany(observer, selector);                    var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(subscription, inner);                });        }        public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, ValueTask<IAsyncObservable<TResult>>> selector)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (selector == null)                throw new ArgumentNullException(nameof(selector));            return CreateAsyncObservable<TResult>.From(                source,                selector,                static async (source, selector, observer) =>                {                    var (sink, inner) = AsyncObserver.SelectMany(observer, selector);                    var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(subscription, inner);                });        }        public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (collectionSelector == null)                throw new ArgumentNullException(nameof(collectionSelector));            if (resultSelector == null)                throw new ArgumentNullException(nameof(resultSelector));            return CreateAsyncObservable<TResult>.From(                source,                (collectionSelector, resultSelector),                static async (source, state, observer) =>                {                    var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector);                    var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(subscription, inner);                });        }        public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, ValueTask<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, int, TCollection, int, ValueTask<TResult>> resultSelector)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (collectionSelector == null)                throw new ArgumentNullException(nameof(collectionSelector));            if (resultSelector == null)                throw new ArgumentNullException(nameof(resultSelector));            return CreateAsyncObservable<TResult>.From(                source,                (collectionSelector, resultSelector),                static async (source, state, observer) =>                {                    var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector);                    var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(subscription, inner);                });        }    }    public partial class AsyncObserver    {        public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, IAsyncObservable<TResult>> selector)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (selector == null)                throw new ArgumentNullException(nameof(selector));            return SelectMany<TSource, TResult, TResult>(observer, x => new ValueTask<IAsyncObservable<TResult>>(selector(x)), (x, y) => new ValueTask<TResult>(y));        }        public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, ValueTask<IAsyncObservable<TResult>>> selector)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (selector == null)                throw new ArgumentNullException(nameof(selector));            return SelectMany<TSource, TResult, TResult>(observer, selector, (x, y) => new ValueTask<TResult>(y));        }        public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (collectionSelector == null)                throw new ArgumentNullException(nameof(collectionSelector));            if (resultSelector == null)                throw new ArgumentNullException(nameof(resultSelector));            return SelectMany<TSource, TCollection, TResult>(observer, x => new ValueTask<IAsyncObservable<TCollection>>(collectionSelector(x)), (x, y) => new ValueTask<TResult>(resultSelector(x, y)));        }        public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, ValueTask<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (collectionSelector == null)                throw new ArgumentNullException(nameof(collectionSelector));            if (resultSelector == null)                throw new ArgumentNullException(nameof(resultSelector));            var gate = new AsyncGate();            var count = 1;            var disposable = new CompositeAsyncDisposable();            async ValueTask OnErrorAsync(Exception ex)            {                using (await gate.LockAsync().ConfigureAwait(false))                {                    await observer.OnErrorAsync(ex).ConfigureAwait(false);                }            };            async ValueTask OnCompletedAsync()            {                using (await gate.LockAsync().ConfigureAwait(false))                {                    if (--count == 0)                    {                        await observer.OnCompletedAsync().ConfigureAwait(false);                    }                }            };            return            (                Create<TSource>(                    async x =>                    {                        var collection = default(IAsyncObservable<TCollection>);                        try                        {                            collection = await collectionSelector(x).ConfigureAwait(false);                        }                        catch (Exception ex)                        {                            await OnErrorAsync(ex).ConfigureAwait(false);                            return;                        }                        using (await gate.LockAsync().ConfigureAwait(false))                        {                            count++;                        }                        var inner = new SingleAssignmentAsyncDisposable();                        await disposable.AddAsync(inner).ConfigureAwait(false);                        var innerObserver = Create<TCollection>(                            async y =>                            {                                var res = default(TResult);                                try                                {                                    res = await resultSelector(x, y).ConfigureAwait(false);                                }                                catch (Exception ex)                                {                                    await OnErrorAsync(ex).ConfigureAwait(false);                                    return;                                }                                using (await gate.LockAsync().ConfigureAwait(false))                                {                                    await observer.OnNextAsync(res).ConfigureAwait(false);                                }                            },                            OnErrorAsync,                            async () =>                            {                                await OnCompletedAsync().ConfigureAwait(false);                                await disposable.RemoveAsync(inner).ConfigureAwait(false);                            }                        );                        var innerSubscription = await collection.SubscribeSafeAsync(innerObserver).ConfigureAwait(false);                        await inner.AssignAsync(innerSubscription).ConfigureAwait(false);                    },                    OnErrorAsync,                    OnCompletedAsync                ),                disposable            );        }        public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, IAsyncObservable<TResult>> selector)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (selector == null)                throw new ArgumentNullException(nameof(selector));            return SelectMany<TSource, TResult, TResult>(observer, (x, i) => new ValueTask<IAsyncObservable<TResult>>(selector(x, i)), (x, i, y, j) => new ValueTask<TResult>(y));        }        public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, ValueTask<IAsyncObservable<TResult>>> selector)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (selector == null)                throw new ArgumentNullException(nameof(selector));            return SelectMany<TSource, TResult, TResult>(observer, selector, (x, i, y, j) => new ValueTask<TResult>(y));        }        public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (collectionSelector == null)                throw new ArgumentNullException(nameof(collectionSelector));            if (resultSelector == null)                throw new ArgumentNullException(nameof(resultSelector));            return SelectMany<TSource, TCollection, TResult>(observer, (x, i) => new ValueTask<IAsyncObservable<TCollection>>(collectionSelector(x, i)), (x, i, y, j) => new ValueTask<TResult>(resultSelector(x, i, y, j)));        }        public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, ValueTask<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, int, TCollection, int, ValueTask<TResult>> resultSelector)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (collectionSelector == null)                throw new ArgumentNullException(nameof(collectionSelector));            if (resultSelector == null)                throw new ArgumentNullException(nameof(resultSelector));            async ValueTask<IAsyncObservable<(TCollection item, int i)>> collectionSelectorWithIndex((TSource item, int i) t) => (await collectionSelector(t.item, t.i).ConfigureAwait(false)).Select((item, i) => (item, i));            ValueTask<TResult> resultSelectorWithIndex((TSource item, int i) outer, (TCollection item, int i) inner) => resultSelector(outer.item, outer.i, inner.item, inner.i);            var (outerObserverWithIndex, disposable) = SelectMany(observer, collectionSelectorWithIndex, (Func<(TSource item, int i), (TCollection item, int i), ValueTask<TResult>>)resultSelectorWithIndex);            var outerObserver = Select<TSource, (TSource item, int i)>(outerObserverWithIndex, (item, i) => (item, i));            return (outerObserver, disposable);        }    }}
 |