Преглед изворни кода

Make EmitTaskResult an extension method.

Daniel Weber пре 7 година
родитељ
комит
ad703e9360

+ 8 - 8
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -83,7 +83,7 @@ namespace System.Reactive.Threading.Tasks
                 var subject = new AsyncSubject<Unit>();
                 var options = GetTaskContinuationOptions(scheduler);
 
-                task.ContinueWith((t, subjectObject) => EmitTaskResult(t, (AsyncSubject<Unit>)subjectObject), subject, options);
+                task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<Unit>)subjectObject), subject, options);
 
                 return ToObservableResult(subject, scheduler);
             }
@@ -91,7 +91,7 @@ namespace System.Reactive.Threading.Tasks
             return res;
         }
 
-        private static void EmitTaskResult(Task task, IObserver<Unit> subject)
+        private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
         {
             switch (task.Status)
             {
@@ -112,14 +112,14 @@ namespace System.Reactive.Threading.Tasks
         {
             if (task.IsCompleted)
             {
-                EmitTaskResult(task, observer);
+                task.EmitTaskResult(observer);
                 return Disposable.Empty;
             }
 
             var cts = new CancellationDisposable();
 
             task.ContinueWith(
-                (t, observerObject) => EmitTaskResult(t, (IObserver<Unit>)observerObject), 
+                (t, observerObject) => t.EmitTaskResult((IObserver<Unit>)observerObject), 
                 observer, 
                 cts.Token, 
                 TaskContinuationOptions.ExecuteSynchronously, 
@@ -196,7 +196,7 @@ namespace System.Reactive.Threading.Tasks
                 var subject = new AsyncSubject<TResult>();
                 var options = GetTaskContinuationOptions(scheduler);
 
-                task.ContinueWith((t, subjectObject) => EmitTaskResult(t, (AsyncSubject<TResult>)subjectObject), subject, options);
+                task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<TResult>)subjectObject), subject, options);
 
                 return ToObservableResult(subject, scheduler);
             }
@@ -204,7 +204,7 @@ namespace System.Reactive.Threading.Tasks
             return res;
         }
 
-        private static void EmitTaskResult<TResult>(Task<TResult> task, IObserver<TResult> subject)
+        private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
         {
             switch (task.Status)
             {
@@ -257,14 +257,14 @@ namespace System.Reactive.Threading.Tasks
         {
             if (task.IsCompleted)
             {
-                EmitTaskResult(task, observer);
+                task.EmitTaskResult(observer);
                 return Disposable.Empty;
             }
 
             var cts = new CancellationDisposable();
 
             task.ContinueWith(
-                (t, observerObject) => EmitTaskResult(t, (IObserver<TResult>)observerObject), 
+                (t, observerObject) => t.EmitTaskResult((IObserver<TResult>)observerObject), 
                 observer, 
                 cts.Token, 
                 TaskContinuationOptions.ExecuteSynchronously,