// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Threading.Tasks; namespace System.Reactive.Linq { public static partial class AsyncObservable { internal static class CreateAsyncObservable { private sealed class AsyncObservableImpl : AsyncObservableBase { private readonly IAsyncObservable _source; private readonly Func, IAsyncObserver, ValueTask> _subscribeAsync; public AsyncObservableImpl(IAsyncObservable source, Func, IAsyncObserver, ValueTask> subscribeAsync) { _source = source ?? throw new ArgumentNullException(nameof(source)); _subscribeAsync = subscribeAsync ?? throw new ArgumentNullException(nameof(subscribeAsync)); } protected override ValueTask SubscribeAsyncCore(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); return _subscribeAsync(_source, observer); } } private sealed class AsyncObservableImpl : AsyncObservableBase { private readonly TState _state; private readonly IAsyncObservable _source; private readonly Func, TState, IAsyncObserver, ValueTask> _subscribeAsync; public AsyncObservableImpl(IAsyncObservable source, TState state, Func, TState, IAsyncObserver, ValueTask> subscribeAsync) { _state = state; _source = source ?? throw new ArgumentNullException(nameof(source)); _subscribeAsync = subscribeAsync ?? throw new ArgumentNullException(nameof(subscribeAsync)); } protected override ValueTask SubscribeAsyncCore(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); return _subscribeAsync(_source, _state, observer); } } public static IAsyncObservable From(IAsyncObservable source, Func, IAsyncObserver, ValueTask> subscribeAsync) { return new AsyncObservableImpl(source, subscribeAsync); } public static IAsyncObservable From(IAsyncObservable source, TState state, Func, TState, IAsyncObserver, ValueTask> subscribeAsync) { return new AsyncObservableImpl(source, state, subscribeAsync); } } public static IAsyncObservable Create(Func, ValueTask> subscribeAsync) { if (subscribeAsync == null) throw new ArgumentNullException(nameof(subscribeAsync)); return new AsyncObservable(subscribeAsync); } internal static IAsyncObservable Create(IAsyncObservable source, Func, IAsyncObserver, ValueTask> subscribeAsync) { return CreateAsyncObservable.From(source, subscribeAsync); } internal static IAsyncObservable Create(IAsyncObservable source, TState state, Func, TState, IAsyncObserver, ValueTask> subscribeAsync) { return CreateAsyncObservable.From(source, state, subscribeAsync); } internal static IAsyncObservable Create(IAsyncObservable source, Func, IAsyncObserver, ValueTask> subscribeAsync) { return Create(source, subscribeAsync); } public static ValueTask SubscribeSafeAsync(this IAsyncObservable source, IAsyncObserver observer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (observer == null) throw new ArgumentNullException(nameof(observer)); return CoreAsync(); async ValueTask CoreAsync() { try { return await source.SubscribeAsync(observer).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return AsyncDisposable.Nop; } } } } }