| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Runtime.ExceptionServices;using System.Threading.Tasks;namespace System.Reactive.Linq{    public static partial class AsyncObserver    {        public static IAsyncObserver<T> Create<T>(Func<T, Task> onNextAsync)        {            if (onNextAsync == null)                throw new ArgumentNullException(nameof(onNextAsync));            return new AnonymousAsyncObserver<T>(                onNextAsync,                ex =>                {                    ExceptionDispatchInfo.Capture(ex).Throw();                    return Task.CompletedTask;                },                () => Task.CompletedTask            );        }        public static IAsyncObserver<T> Create<T>(Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)        {            if (onNextAsync == null)                throw new ArgumentNullException(nameof(onNextAsync));            if (onErrorAsync == null)                throw new ArgumentNullException(nameof(onErrorAsync));            if (onCompletedAsync == null)                throw new ArgumentNullException(nameof(onCompletedAsync));            return new AnonymousAsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync);        }        private sealed class AnonymousAsyncObserver<T> : AsyncObserverBase<T>        {            private readonly Func<T, Task> _onNextAsync;            private readonly Func<Exception, Task> _onErrorAsync;            private readonly Func<Task> _onCompletedAsync;            public AnonymousAsyncObserver(Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)            {                _onNextAsync = onNextAsync;                _onErrorAsync = onErrorAsync;                _onCompletedAsync = onCompletedAsync;            }            protected override Task OnCompletedAsyncCore() => _onCompletedAsync();            protected override Task OnErrorAsyncCore(Exception error) => _onErrorAsync(error);            protected override Task OnNextAsyncCore(T value) => _onNextAsync(value);        }    }}
 |