Browse Source

Using SubscribeSafeAsync in a few more places.

Bart De Smet 8 years ago
parent
commit
b39ef0a310

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Delay.cs

@@ -23,7 +23,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, drain) = await AsyncObserver.Delay(observer, dueTime).ConfigureAwait(false);
 
-                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+                var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
 
                 return StableCompositeAsyncDisposable.Create(subscription, drain);
             });
@@ -40,7 +40,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, drain) = await AsyncObserver.Delay(observer, dueTime, scheduler).ConfigureAwait(false);
 
-                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+                var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
 
                 return StableCompositeAsyncDisposable.Create(subscription, drain);
             });

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/ObserveOn.cs

@@ -23,7 +23,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, drain) = await AsyncObserver.ObserveOn(observer, scheduler).ConfigureAwait(false);
 
-                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+                var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
 
                 return StableCompositeAsyncDisposable.Create(subscription, drain);
             });

+ 8 - 8
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/SelectMany.cs

@@ -21,7 +21,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
 
-                var subscription = await source.SubscribeAsync(sink);
+                var subscription = await source.SubscribeSafeAsync(sink);
 
                 return StableCompositeAsyncDisposable.Create(subscription, inner);
             });
@@ -38,7 +38,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
 
-                var subscription = await source.SubscribeAsync(sink);
+                var subscription = await source.SubscribeSafeAsync(sink);
 
                 return StableCompositeAsyncDisposable.Create(subscription, inner);
             });
@@ -57,7 +57,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector);
 
-                var subscription = await source.SubscribeAsync(sink);
+                var subscription = await source.SubscribeSafeAsync(sink);
 
                 return StableCompositeAsyncDisposable.Create(subscription, inner);
             });
@@ -76,7 +76,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector);
 
-                var subscription = await source.SubscribeAsync(sink);
+                var subscription = await source.SubscribeSafeAsync(sink);
 
                 return StableCompositeAsyncDisposable.Create(subscription, inner);
             });
@@ -93,7 +93,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
 
-                var subscription = await source.SubscribeAsync(sink);
+                var subscription = await source.SubscribeSafeAsync(sink);
 
                 return StableCompositeAsyncDisposable.Create(subscription, inner);
             });
@@ -110,7 +110,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
 
-                var subscription = await source.SubscribeAsync(sink);
+                var subscription = await source.SubscribeSafeAsync(sink);
 
                 return StableCompositeAsyncDisposable.Create(subscription, inner);
             });
@@ -129,7 +129,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector);
 
-                var subscription = await source.SubscribeAsync(sink);
+                var subscription = await source.SubscribeSafeAsync(sink);
 
                 return StableCompositeAsyncDisposable.Create(subscription, inner);
             });
@@ -148,7 +148,7 @@ namespace System.Reactive.Linq
             {
                 var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector);
 
-                var subscription = await source.SubscribeAsync(sink);
+                var subscription = await source.SubscribeSafeAsync(sink);
 
                 return StableCompositeAsyncDisposable.Create(subscription, inner);
             });

+ 5 - 5
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Timeout.cs

@@ -22,7 +22,7 @@ namespace System.Reactive.Linq
 
                 var (sink, disposable) = await AsyncObserver.Timeout(observer, sourceSubscription, dueTime).ConfigureAwait(false);
 
-                var sourceSubscriptionInner = await source.SubscribeAsync(sink).ConfigureAwait(false);
+                var sourceSubscriptionInner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
 
                 await sourceSubscription.AssignAsync(sourceSubscriptionInner).ConfigureAwait(false);
 
@@ -43,7 +43,7 @@ namespace System.Reactive.Linq
 
                 var (sink, disposable) = await AsyncObserver.Timeout(observer, sourceSubscription, dueTime, scheduler).ConfigureAwait(false);
 
-                var sourceSubscriptionInner = await source.SubscribeAsync(sink).ConfigureAwait(false);
+                var sourceSubscriptionInner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
 
                 await sourceSubscription.AssignAsync(sourceSubscriptionInner).ConfigureAwait(false);
 
@@ -64,7 +64,7 @@ namespace System.Reactive.Linq
 
                 var (sink, disposable) = await AsyncObserver.Timeout(observer, sourceSubscription, dueTime, other).ConfigureAwait(false);
 
-                var sourceSubscriptionInner = await source.SubscribeAsync(sink).ConfigureAwait(false);
+                var sourceSubscriptionInner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
 
                 await sourceSubscription.AssignAsync(sourceSubscriptionInner).ConfigureAwait(false);
 
@@ -87,7 +87,7 @@ namespace System.Reactive.Linq
 
                 var (sink, disposable) = await AsyncObserver.Timeout(observer, sourceSubscription, dueTime, other, scheduler).ConfigureAwait(false);
 
-                var sourceSubscriptionInner = await source.SubscribeAsync(sink).ConfigureAwait(false);
+                var sourceSubscriptionInner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
 
                 await sourceSubscription.AssignAsync(sourceSubscriptionInner).ConfigureAwait(false);
 
@@ -192,7 +192,7 @@ namespace System.Reactive.Linq
 
                         if (hasWon)
                         {
-                            var otherSubscription = await other.SubscribeAsync(observer).RendezVous(scheduler);
+                            var otherSubscription = await other.SubscribeSafeAsync(observer).RendezVous(scheduler);
 
                             await subscription.AssignAsync(otherSubscription).RendezVous(scheduler);
                         }