浏览代码

Adding a SubscribeAsync extension method.

Bart De Smet 8 年之前
父节点
当前提交
a191068d82
共有 1 个文件被更改,包括 14 次插入0 次删除
  1. 14 0
      AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/AsyncObservable.cs

+ 14 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/AsyncObservable.cs

@@ -16,6 +16,20 @@ namespace System.Reactive.Linq
             return new AnonymousAsyncObservable<T>(subscribeAsync);
         }
 
+        public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (onNextAsync == null)
+                throw new ArgumentNullException(nameof(onNextAsync));
+            if (onErrorAsync == null)
+                throw new ArgumentNullException(nameof(onErrorAsync));
+            if (onCompletedAsync == null)
+                throw new ArgumentNullException(nameof(onCompletedAsync));
+
+            return source.SubscribeAsync(AsyncObserver.Create(onNextAsync, onErrorAsync, onCompletedAsync));
+        }
+
         private sealed class AnonymousAsyncObservable<T> : AsyncObservableBase<T>
         {
             private readonly Func<IAsyncObserver<T>, Task<IAsyncDisposable>> _subscribeAsync;