Преглед на файлове

Adding SubscribeAsync extension methods with synchronous callbacks.

Bart De Smet преди 8 години
родител
ревизия
48d254d9b7
променени са 1 файла, в които са добавени 48 реда и са изтрити 0 реда
  1. 48 0
      AsyncRx.NET/System.Reactive.Async.Core/System/AsyncObservableExtensions.cs

+ 48 - 0
AsyncRx.NET/System.Reactive.Async.Core/System/AsyncObservableExtensions.cs

@@ -56,5 +56,53 @@ namespace System
 
             return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync));
         }
+
+        public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (onNext == null)
+                throw new ArgumentNullException(nameof(onNext));
+
+            return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return Task.CompletedTask; }, ex => Task.FromException(ex), () => Task.CompletedTask));
+        }
+
+        public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action<Exception> onError)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (onNext == null)
+                throw new ArgumentNullException(nameof(onNext));
+            if (onError == null)
+                throw new ArgumentNullException(nameof(onError));
+
+            return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return Task.CompletedTask; }, ex => { onError(ex); return Task.CompletedTask; }, () => Task.CompletedTask));
+        }
+
+        public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action onCompleted)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (onNext == null)
+                throw new ArgumentNullException(nameof(onNext));
+            if (onCompleted == null)
+                throw new ArgumentNullException(nameof(onCompleted));
+
+            return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return Task.CompletedTask; }, ex => Task.FromException(ex), () => { onCompleted(); return Task.CompletedTask; }));
+        }
+
+        public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (onNext == null)
+                throw new ArgumentNullException(nameof(onNext));
+            if (onError == null)
+                throw new ArgumentNullException(nameof(onError));
+            if (onCompleted == null)
+                throw new ArgumentNullException(nameof(onCompleted));
+
+            return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return Task.CompletedTask; }, ex => { onError(ex); return Task.CompletedTask; }, () => { onCompleted(); return Task.CompletedTask; }));
+        }
     }
 }