|
@@ -9,6 +9,40 @@ namespace System
|
|
|
{
|
|
{
|
|
|
public static class AsyncObservableExtensions
|
|
public static class AsyncObservableExtensions
|
|
|
{
|
|
{
|
|
|
|
|
+ public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (source == null)
|
|
|
|
|
+ throw new ArgumentNullException(nameof(source));
|
|
|
|
|
+ if (onNextAsync == null)
|
|
|
|
|
+ throw new ArgumentNullException(nameof(onNextAsync));
|
|
|
|
|
+
|
|
|
|
|
+ return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => Task.FromException(ex), () => Task.CompletedTask));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (source == null)
|
|
|
|
|
+ throw new ArgumentNullException(nameof(source));
|
|
|
|
|
+ if (onNextAsync == null)
|
|
|
|
|
+ throw new ArgumentNullException(nameof(onNextAsync));
|
|
|
|
|
+ if (onErrorAsync == null)
|
|
|
|
|
+ throw new ArgumentNullException(nameof(onErrorAsync));
|
|
|
|
|
+
|
|
|
|
|
+ return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, () => Task.CompletedTask));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync, Func<Task> onCompletedAsync)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (source == null)
|
|
|
|
|
+ throw new ArgumentNullException(nameof(source));
|
|
|
|
|
+ if (onNextAsync == null)
|
|
|
|
|
+ throw new ArgumentNullException(nameof(onNextAsync));
|
|
|
|
|
+ if (onCompletedAsync == null)
|
|
|
|
|
+ throw new ArgumentNullException(nameof(onCompletedAsync));
|
|
|
|
|
+
|
|
|
|
|
+ return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => Task.FromException(ex), onCompletedAsync));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)
|
|
public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)
|
|
|
{
|
|
{
|
|
|
if (source == null)
|
|
if (source == null)
|