浏览代码

Adding SubscribeSafeAsync extension method.

Bart De Smet 8 年之前
父节点
当前提交
04d9b54fb1
共有 1 个文件被更改,包括 25 次插入0 次删除
  1. 25 0
      AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/AsyncObservable.cs

+ 25 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/AsyncObservable.cs

@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Reactive.Disposables;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -30,6 +31,30 @@ namespace System.Reactive.Linq
             return source.SubscribeAsync(AsyncObserver.Create(onNextAsync, onErrorAsync, onCompletedAsync));
         }
 
+        public static Task<IAsyncDisposable> SubscribeSafeAsync<T>(this IAsyncObservable<T> source, IAsyncObserver<T> observer)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            return CoreAsync();
+
+            async Task<IAsyncDisposable> CoreAsync()
+            {
+                try
+                {
+                    return await source.SubscribeAsync(observer);
+                }
+                catch (Exception ex)
+                {
+                    await observer.OnErrorAsync(ex).ConfigureAwait(false);
+
+                    return AsyncDisposable.Nop;
+                }
+            }
+        }
+
         private sealed class AnonymousAsyncObservable<T> : AsyncObservableBase<T>
         {
             private readonly Func<IAsyncObserver<T>, Task<IAsyncDisposable>> _subscribeAsync;