Browse Source

Additional overload for SubscribeOn.

Bart De Smet 8 years ago
parent
commit
482c9d1845

+ 17 - 7
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/SubscribeOn.cs

@@ -7,8 +7,6 @@ using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq
 {
-    // REVIEW: Consider more fine-grained control over subscribe and dispose schedulers.
-
     partial class AsyncObservable
     {
         public static IAsyncObservable<TSource> SubscribeOn<TSource>(this IAsyncObservable<TSource> source, IAsyncScheduler scheduler)
@@ -18,6 +16,18 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
+            return SubscribeOn(source, scheduler, scheduler);
+        }
+
+        public static IAsyncObservable<TSource> SubscribeOn<TSource>(this IAsyncObservable<TSource> source, IAsyncScheduler subscribeScheduler, IAsyncScheduler disposeScheduler)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (subscribeScheduler == null)
+                throw new ArgumentNullException(nameof(subscribeScheduler));
+            if (disposeScheduler == null)
+                throw new ArgumentNullException(nameof(disposeScheduler));
+
             return Create<TSource>(async observer =>
             {
                 var m = new SingleAssignmentAsyncDisposable();
@@ -25,19 +35,19 @@ namespace System.Reactive.Linq
 
                 await d.AssignAsync(m).ConfigureAwait(false);
 
-                var scheduled = await scheduler.ScheduleAsync(async ct =>
+                var scheduled = await subscribeScheduler.ScheduleAsync(async ct =>
                 {
-                    var subscription = await source.SubscribeSafeAsync(observer).RendezVous(scheduler, ct);
+                    var subscription = await source.SubscribeSafeAsync(observer).RendezVous(subscribeScheduler, ct);
 
                     var scheduledDispose = AsyncDisposable.Create(async () =>
                     {
-                        await scheduler.ScheduleAsync(async _ =>
+                        await disposeScheduler.ScheduleAsync(async _ =>
                         {
-                            await subscription.DisposeAsync().RendezVous(scheduler, ct);
+                            await subscription.DisposeAsync().RendezVous(disposeScheduler, ct);
                         }).ConfigureAwait(false);
                     });
 
-                    await d.AssignAsync(scheduledDispose).RendezVous(scheduler, ct);
+                    await d.AssignAsync(scheduledDispose).RendezVous(subscribeScheduler, ct);
                 }).ConfigureAwait(false);
 
                 await m.AssignAsync(scheduled).ConfigureAwait(false);