123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Reactive.Disposables;
- using System.Threading.Tasks;
- namespace System.Reactive.Linq
- {
- public static partial class AsyncObservable
- {
- internal static class CreateAsyncObservable<TResult>
- {
- private sealed class AsyncObservableImpl<TSource> : AsyncObservableBase<TResult>
- {
- private readonly IAsyncObservable<TSource> _source;
- private readonly Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> _subscribeAsync;
- public AsyncObservableImpl(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
- {
- _source = source ?? throw new ArgumentNullException(nameof(source));
- _subscribeAsync = subscribeAsync ?? throw new ArgumentNullException(nameof(subscribeAsync));
- }
- protected override ValueTask<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<TResult> observer)
- {
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- return _subscribeAsync(_source, observer);
- }
- }
- private sealed class AsyncObservableImpl<TSource, TState> : AsyncObservableBase<TResult>
- {
- private readonly TState _state;
- private readonly IAsyncObservable<TSource> _source;
- private readonly Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> _subscribeAsync;
- public AsyncObservableImpl(IAsyncObservable<TSource> source, TState state, Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
- {
- _state = state;
- _source = source ?? throw new ArgumentNullException(nameof(source));
- _subscribeAsync = subscribeAsync ?? throw new ArgumentNullException(nameof(subscribeAsync));
- }
- protected override ValueTask<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<TResult> observer)
- {
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- return _subscribeAsync(_source, _state, observer);
- }
- }
- public static IAsyncObservable<TResult> From<TSource>(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
- {
- return new AsyncObservableImpl<TSource>(source, subscribeAsync);
- }
- public static IAsyncObservable<TResult> From<TSource, TState>(IAsyncObservable<TSource> source, TState state, Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
- {
- return new AsyncObservableImpl<TSource, TState>(source, state, subscribeAsync);
- }
- }
- public static IAsyncObservable<T> Create<T>(Func<IAsyncObserver<T>, ValueTask<IAsyncDisposable>> subscribeAsync)
- {
- if (subscribeAsync == null)
- throw new ArgumentNullException(nameof(subscribeAsync));
- return new AsyncObservable<T>(subscribeAsync);
- }
- internal static IAsyncObservable<TResult> Create<TSource, TResult>(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
- {
- return CreateAsyncObservable<TResult>.From(source, subscribeAsync);
- }
- internal static IAsyncObservable<TSource> Create<TSource, TState>(IAsyncObservable<TSource> source, TState state, Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TSource>, ValueTask<IAsyncDisposable>> subscribeAsync)
- {
- return CreateAsyncObservable<TSource>.From(source, state, subscribeAsync);
- }
- internal static IAsyncObservable<TSource> Create<TSource>(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TSource>, ValueTask<IAsyncDisposable>> subscribeAsync)
- {
- return Create<TSource, TSource>(source, subscribeAsync);
- }
- public static ValueTask<IAsyncDisposable> SubscribeSafeAsync<T>(this IAsyncObservable<T> source, IAsyncObserver<T> observer)
- {
- if (source == null)
- throw new ArgumentNullException(nameof(source));
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- return CoreAsync();
- async ValueTask<IAsyncDisposable> CoreAsync()
- {
- try
- {
- return await source.SubscribeAsync(observer).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- await observer.OnErrorAsync(ex).ConfigureAwait(false);
- return AsyncDisposable.Nop;
- }
- }
- }
- }
- }
|