| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the MIT License.// See the LICENSE file in the project root for more information. using System.Reactive.Disposables;using System.Threading;using System.Threading.Tasks;namespace System.Reactive.Linq{    public partial class AsyncObservable    {        public static Task ForEachAsync<TSource>(this IAsyncObservable<TSource> source, Action<TSource> onNext, CancellationToken token = default)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNext == null)                throw new ArgumentNullException(nameof(onNext));            return ForEachAsyncCore(source, (x, i) => { onNext(x); return Task.CompletedTask; }, token);        }        public static Task ForEachAsync<TSource>(this IAsyncObservable<TSource> source, Func<TSource, Task> onNext, CancellationToken token = default)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNext == null)                throw new ArgumentNullException(nameof(onNext));            return ForEachAsyncCore(source, (x, i) => onNext(x), token);        }        public static Task ForEachAsync<TSource>(this IAsyncObservable<TSource> source, Action<TSource, int> onNext, CancellationToken token = default)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNext == null)                throw new ArgumentNullException(nameof(onNext));            return ForEachAsyncCore(source, (x, i) => { onNext(x, i); return Task.CompletedTask; }, token);        }        public static Task ForEachAsync<TSource>(this IAsyncObservable<TSource> source, Func<TSource, int, Task> onNext, CancellationToken token = default)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNext == null)                throw new ArgumentNullException(nameof(onNext));            return ForEachAsyncCore(source, onNext, token);        }        private static async Task ForEachAsyncCore<TSource>(IAsyncObservable<TSource> source, Func<TSource, int, Task> onNext, CancellationToken token)        {            token.ThrowIfCancellationRequested();            var tcs = new TaskCompletionSource<object>();            var subscription = new SingleAssignmentAsyncDisposable();            using (token.Register(() =>            {                tcs.TrySetCanceled(token);                subscription.DisposeAsync().AsTask().ContinueWith(t =>                {                    if (t.Exception != null)                    {                        // TODO: Trace?                    }                });            }))            {                var i = 0;                var o = AsyncObserver.Create<TSource>(                    async x =>                    {                        try                        {                            await onNext(x, checked(i++)).ConfigureAwait(false);                        }                        catch (Exception ex)                        {                            try                            {                                tcs.TrySetException(ex);                            }                            finally                            {                                await subscription.DisposeAsync().ConfigureAwait(false);                            }                        }                    },                    async ex =>                    {                        try                        {                            tcs.TrySetException(ex);                        }                        finally                        {                            await subscription.DisposeAsync().ConfigureAwait(false);                        }                    },                    async () =>                    {                        try                        {                            tcs.TrySetResult(null);                        }                        finally                        {                            await subscription.DisposeAsync().ConfigureAwait(false);                        }                    }                );                //                // NB: If any of the lines below throw, the result will go into the Task returned from the async method.                //     There's also no need to use SubscribeSafeAsync here; the exception will propagate just fine.                //                var d = await source.SubscribeAsync(o).ConfigureAwait(false);                await subscription.AssignAsync(d).ConfigureAwait(false);                await tcs.Task.ConfigureAwait(false);            }        }    }}
 |