소스 검색

Merge pull request #760 from danielcweber/ReviewTaskObservableExtensions

Review TaskObservableExtensions
Daniel C. Weber 7 년 전
부모
커밋
cb8d44dbc8

+ 0 - 3
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -101,7 +101,6 @@ namespace System.Reactive.Linq
                 public Subscription(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync, IObserver<TResult> observer)
                 {
                     _subscription = subscribeAsync(observer, _cts.Token)
-                        .ToObservable()
                         .Subscribe(new TaskCompletionObserver(observer));
                 }
 
@@ -180,7 +179,6 @@ namespace System.Reactive.Linq
                     // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
                     //
                     subscribeAsync(observer, _cts.Token)
-                        .ToObservable()
                         .Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
                 }
 
@@ -262,7 +260,6 @@ namespace System.Reactive.Linq
                     // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
                     //
                     subscribeAsync(observer, _cts.Token)
-                        .ToObservable()
                         .Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
                 }
 

+ 56 - 51
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 using System.Reactive.Linq;
 using System.Reactive.Linq.ObservableImpl;
 using System.Reactive.Subjects;
@@ -58,48 +59,30 @@ namespace System.Reactive.Threading.Tasks
 
         private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler)
         {
-            var res = default(IObservable<Unit>);
-
             if (task.IsCompleted)
             {
                 scheduler = scheduler ?? ImmediateScheduler.Instance;
 
                 switch (task.Status)
                 {
-                    case TaskStatus.RanToCompletion:
-                        res = new Return<Unit>(Unit.Default, scheduler);
-                        break;
                     case TaskStatus.Faulted:
-                        res = new Throw<Unit>(task.Exception.InnerException, scheduler);
-                        break;
+                        return new Throw<Unit>(task.Exception.InnerException, scheduler);
                     case TaskStatus.Canceled:
-                        res = new Throw<Unit>(new TaskCanceledException(task), scheduler);
-                        break;
+                        return new Throw<Unit>(new TaskCanceledException(task), scheduler);
                 }
-            }
-            else
-            {
-                //
-                // Separate method to avoid closure in synchronous completion case.
-                //
-                res = ToObservableSlow(task, scheduler);
-            }
 
-            return res;
-        }
+                return new Return<Unit>(Unit.Default, scheduler);
+            }
 
-        private static IObservable<Unit> ToObservableSlow(Task task, IScheduler scheduler)
-        {
             var subject = new AsyncSubject<Unit>();
-
             var options = GetTaskContinuationOptions(scheduler);
 
-            task.ContinueWith(t => ToObservableDone(task, subject), options);
+            task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<Unit>)subjectObject), subject, options);
 
-            return ToObservableResult(subject, scheduler);
+            return subject.ToObservableResult(scheduler);
         }
 
-        private static void ToObservableDone(Task task, IObserver<Unit> subject)
+        private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
         {
             switch (task.Status)
             {
@@ -116,6 +99,26 @@ namespace System.Reactive.Threading.Tasks
             }
         }
 
+        internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer)
+        {
+            if (task.IsCompleted)
+            {
+                task.EmitTaskResult(observer);
+                return Disposable.Empty;
+            }
+
+            var cts = new CancellationDisposable();
+
+            task.ContinueWith(
+                (t, observerObject) => t.EmitTaskResult((IObserver<Unit>)observerObject), 
+                observer, 
+                cts.Token, 
+                TaskContinuationOptions.ExecuteSynchronously, 
+                TaskScheduler.Default);
+
+            return cts;
+        }
+
         /// <summary>
         /// Returns an observable sequence that propagates the result of the task.
         /// </summary>
@@ -160,48 +163,30 @@ namespace System.Reactive.Threading.Tasks
 
         private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler)
         {
-            var res = default(IObservable<TResult>);
-
             if (task.IsCompleted)
             {
                 scheduler = scheduler ?? ImmediateScheduler.Instance;
 
                 switch (task.Status)
                 {
-                    case TaskStatus.RanToCompletion:
-                        res = new Return<TResult>(task.Result, scheduler);
-                        break;
                     case TaskStatus.Faulted:
-                        res = new Throw<TResult>(task.Exception.InnerException, scheduler);
-                        break;
+                        return new Throw<TResult>(task.Exception.InnerException, scheduler);
                     case TaskStatus.Canceled:
-                        res = new Throw<TResult>(new TaskCanceledException(task), scheduler);
-                        break;
+                        return new Throw<TResult>(new TaskCanceledException(task), scheduler);
                 }
-            }
-            else
-            {
-                //
-                // Separate method to avoid closure in synchronous completion case.
-                //
-                res = ToObservableSlow(task, scheduler);
-            }
 
-            return res;
-        }
+                return new Return<TResult>(task.Result, scheduler);
+            }
 
-        private static IObservable<TResult> ToObservableSlow<TResult>(Task<TResult> task, IScheduler scheduler)
-        {
             var subject = new AsyncSubject<TResult>();
-
             var options = GetTaskContinuationOptions(scheduler);
 
-            task.ContinueWith(t => ToObservableDone(task, subject), options);
+            task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<TResult>)subjectObject), subject, options);
 
-            return ToObservableResult(subject, scheduler);
+            return subject.ToObservableResult(scheduler);
         }
 
-        private static void ToObservableDone<TResult>(Task<TResult> task, IObserver<TResult> subject)
+        private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
         {
             switch (task.Status)
             {
@@ -240,7 +225,7 @@ namespace System.Reactive.Threading.Tasks
             return options;
         }
 
-        private static IObservable<TResult> ToObservableResult<TResult>(AsyncSubject<TResult> subject, IScheduler scheduler)
+        private static IObservable<TResult> ToObservableResult<TResult>(this AsyncSubject<TResult> subject, IScheduler scheduler)
         {
             if (scheduler != null)
             {
@@ -250,6 +235,26 @@ namespace System.Reactive.Threading.Tasks
             return subject.AsObservable();
         }
 
+        internal static IDisposable Subscribe<TResult>(this Task<TResult> task, IObserver<TResult> observer)
+        {
+            if (task.IsCompleted)
+            {
+                task.EmitTaskResult(observer);
+                return Disposable.Empty;
+            }
+
+            var cts = new CancellationDisposable();
+
+            task.ContinueWith(
+                (t, observerObject) => t.EmitTaskResult((IObserver<TResult>)observerObject), 
+                observer, 
+                cts.Token, 
+                TaskContinuationOptions.ExecuteSynchronously, 
+                TaskScheduler.Default);
+
+            return cts;
+        }
+
         /// <summary>
         /// Returns a task that will receive the last value or the exception produced by the observable sequence.
         /// </summary>