123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Reactive;
- using System.Threading.Tasks;
- namespace System
- {
- public static class AsyncObservableExtensions
- {
- public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync)
- {
- if (source == null)
- throw new ArgumentNullException(nameof(source));
- if (onNextAsync == null)
- throw new ArgumentNullException(nameof(onNextAsync));
- return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => new ValueTask(Task.FromException(ex)), () => default));
- }
- public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync, Func<Exception, ValueTask> onErrorAsync)
- {
- if (source == null)
- throw new ArgumentNullException(nameof(source));
- if (onNextAsync == null)
- throw new ArgumentNullException(nameof(onNextAsync));
- if (onErrorAsync == null)
- throw new ArgumentNullException(nameof(onErrorAsync));
- return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, () => default));
- }
- public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync, Func<ValueTask> onCompletedAsync)
- {
- if (source == null)
- throw new ArgumentNullException(nameof(source));
- if (onNextAsync == null)
- throw new ArgumentNullException(nameof(onNextAsync));
- if (onCompletedAsync == null)
- throw new ArgumentNullException(nameof(onCompletedAsync));
- return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => new ValueTask(Task.FromException(ex)), onCompletedAsync));
- }
- public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync, Func<Exception, ValueTask> onErrorAsync, Func<ValueTask> onCompletedAsync)
- {
- if (source == null)
- throw new ArgumentNullException(nameof(source));
- if (onNextAsync == null)
- throw new ArgumentNullException(nameof(onNextAsync));
- if (onErrorAsync == null)
- throw new ArgumentNullException(nameof(onErrorAsync));
- if (onCompletedAsync == null)
- throw new ArgumentNullException(nameof(onCompletedAsync));
- return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync));
- }
- public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext)
- {
- if (source == null)
- throw new ArgumentNullException(nameof(source));
- if (onNext == null)
- throw new ArgumentNullException(nameof(onNext));
- return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => new ValueTask(Task.FromException(ex)), () => default));
- }
- public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action<Exception> onError)
- {
- if (source == null)
- throw new ArgumentNullException(nameof(source));
- if (onNext == null)
- throw new ArgumentNullException(nameof(onNext));
- if (onError == null)
- throw new ArgumentNullException(nameof(onError));
- return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => { onError(ex); return default; }, () => default));
- }
- public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action onCompleted)
- {
- if (source == null)
- throw new ArgumentNullException(nameof(source));
- if (onNext == null)
- throw new ArgumentNullException(nameof(onNext));
- if (onCompleted == null)
- throw new ArgumentNullException(nameof(onCompleted));
- return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => new ValueTask(Task.FromException(ex)), () => { onCompleted(); return default; }));
- }
- public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- if (source == null)
- throw new ArgumentNullException(nameof(source));
- if (onNext == null)
- throw new ArgumentNullException(nameof(onNext));
- if (onError == null)
- throw new ArgumentNullException(nameof(onError));
- if (onCompleted == null)
- throw new ArgumentNullException(nameof(onCompleted));
- return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => { onError(ex); return default; }, () => { onCompleted(); return default; }));
- }
- }
- }
|