| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the MIT License.// See the LICENSE file in the project root for more information. using System.Reactive;using System.Threading.Tasks;namespace System{    public static class AsyncObservableExtensions    {        public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNextAsync == null)                throw new ArgumentNullException(nameof(onNextAsync));            return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => new ValueTask(Task.FromException(ex)), () => default));        }        public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync, Func<Exception, ValueTask> onErrorAsync)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNextAsync == null)                throw new ArgumentNullException(nameof(onNextAsync));            if (onErrorAsync == null)                throw new ArgumentNullException(nameof(onErrorAsync));            return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, () => default));        }        public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync, Func<ValueTask> onCompletedAsync)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNextAsync == null)                throw new ArgumentNullException(nameof(onNextAsync));            if (onCompletedAsync == null)                throw new ArgumentNullException(nameof(onCompletedAsync));            return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => new ValueTask(Task.FromException(ex)), onCompletedAsync));        }        public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync, Func<Exception, ValueTask> onErrorAsync, Func<ValueTask> onCompletedAsync)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNextAsync == null)                throw new ArgumentNullException(nameof(onNextAsync));            if (onErrorAsync == null)                throw new ArgumentNullException(nameof(onErrorAsync));            if (onCompletedAsync == null)                throw new ArgumentNullException(nameof(onCompletedAsync));            return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync));        }        public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNext == null)                throw new ArgumentNullException(nameof(onNext));            return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => new ValueTask(Task.FromException(ex)), () => default));        }        public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action<Exception> onError)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNext == null)                throw new ArgumentNullException(nameof(onNext));            if (onError == null)                throw new ArgumentNullException(nameof(onError));            return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => { onError(ex); return default; }, () => default));        }        public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action onCompleted)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNext == null)                throw new ArgumentNullException(nameof(onNext));            if (onCompleted == null)                throw new ArgumentNullException(nameof(onCompleted));            return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => new ValueTask(Task.FromException(ex)), () => { onCompleted(); return default; }));        }        public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (onNext == null)                throw new ArgumentNullException(nameof(onNext));            if (onError == null)                throw new ArgumentNullException(nameof(onError));            if (onCompleted == null)                throw new ArgumentNullException(nameof(onCompleted));            return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => { onError(ex); return default; }, () => { onCompleted(); return default; }));        }    }}
 |