| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the MIT License.// See the LICENSE file in the project root for more information. using System.Collections.Generic;using System.Reactive.Disposables;using System.Threading.Tasks;namespace System.Reactive.Linq{    // TODO: Implement tail call behavior to flatten Concat chains.    public partial class AsyncObservable    {        public static IAsyncObservable<TSource> Concat<TSource>(this IAsyncObservable<TSource> first, IAsyncObservable<TSource> second)        {            if (first == null)                throw new ArgumentNullException(nameof(first));            if (second == null)                throw new ArgumentNullException(nameof(second));            return Create(                first,                second,                static async (first, second, observer) =>                {                    var (sink, inner) = AsyncObserver.Concat(observer, second);                    var subscription = await first.SubscribeSafeAsync(sink).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(subscription, inner);                });        }        public static IAsyncObservable<TSource> Concat<TSource>(params IAsyncObservable<TSource>[] sources) => Concat((IEnumerable<IAsyncObservable<TSource>>)sources);        public static IAsyncObservable<TSource> Concat<TSource>(this IEnumerable<IAsyncObservable<TSource>> sources)        {            if (sources == null)                throw new ArgumentNullException(nameof(sources));            return Create<TSource>(async observer =>            {                var enumerator = sources.GetEnumerator();                if (!enumerator.MoveNext())                {                    return AsyncDisposable.Nop; // REVIEW: Is Never behavior right here?                }                var source = enumerator.Current;                var (sink, inner) = AsyncObserver.Concat(observer, enumerator);                var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);                return StableCompositeAsyncDisposable.Create(subscription, inner);            });        }    }    public partial class AsyncObserver    {        public static (IAsyncObserver<TSource>, IAsyncDisposable) Concat<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> second)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (second == null)                throw new ArgumentNullException(nameof(second));            var subscription = new SingleAssignmentAsyncDisposable();            var sink = Create<TSource>(                observer.OnNextAsync,                observer.OnErrorAsync,                async () =>                {                    var secondSubscription = await second.SubscribeSafeAsync(observer).ConfigureAwait(false);                    await subscription.AssignAsync(secondSubscription).ConfigureAwait(false);                }            );            return (sink, subscription);        }        public static (IAsyncObserver<TSource>, IAsyncDisposable) Concat<TSource>(IAsyncObserver<TSource> observer, IEnumerator<IAsyncObservable<TSource>> handlers)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (handlers == null)                throw new ArgumentNullException(nameof(handlers));            var innerSubscription = new SerialAsyncDisposable();            IAsyncObserver<TSource> GetSink() =>                Create<TSource>(                    observer.OnNextAsync,                    observer.OnErrorAsync,                    async () =>                    {                        var next = default(IAsyncObservable<TSource>);                        try                        {                            if (handlers.MoveNext())                            {                                next = handlers.Current;                            }                        }                        catch (Exception err)                        {                            await observer.OnErrorAsync(err).ConfigureAwait(false);                            return;                        }                        if (next == null)                        {                            await observer.OnCompletedAsync().ConfigureAwait(false); // REVIEW: Is Empty behavior right here?                            return;                        }                        var nextSubscription = await next.SubscribeSafeAsync(GetSink()).ConfigureAwait(false);                        await innerSubscription.AssignAsync(nextSubscription).ConfigureAwait(false);                    }                );            var disposeEnumerator = AsyncDisposable.Create(() =>            {                handlers.Dispose();                return default;            });            var subscription = StableCompositeAsyncDisposable.Create(innerSubscription, disposeEnumerator);            return (GetSink(), subscription);        }    }}
 |