Browse Source

Internal shortcut for Task.ToObservable().Subscribe(...). This avoids the allocation of an AsyncSubject.

Daniel Weber 7 years ago
parent
commit
438cd71e4a

+ 42 - 1
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 using System.Reactive.Linq;
 using System.Reactive.Linq.ObservableImpl;
 using System.Reactive.Subjects;
@@ -115,6 +116,26 @@ namespace System.Reactive.Threading.Tasks
             }
         }
 
+        internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer)
+        {
+            if (task.IsCompleted)
+            {
+                ToObservableDone(task, observer);
+                return Disposable.Empty;
+            }
+
+            var cts = new CancellationDisposable();
+
+            task.ContinueWith(
+                (t, observerObject) => ToObservableDone(t, (IObserver<Unit>)observerObject), 
+                observer, 
+                cts.Token, 
+                TaskContinuationOptions.ExecuteSynchronously, 
+                TaskScheduler.Default);
+
+            return cts;
+        }
+
         /// <summary>
         /// Returns an observable sequence that propagates the result of the task.
         /// </summary>
@@ -195,7 +216,7 @@ namespace System.Reactive.Threading.Tasks
 
             var options = GetTaskContinuationOptions(scheduler);
 
-            task.ContinueWith(t => ToObservableDone(task, subject), options);
+            task.ContinueWith((t, subjectObject) => ToObservableDone(t, (AsyncSubject<TResult>)subjectObject), subject, options);
 
             return ToObservableResult(subject, scheduler);
         }
@@ -249,6 +270,26 @@ namespace System.Reactive.Threading.Tasks
             return subject.AsObservable();
         }
 
+        internal static IDisposable Subscribe<TResult>(this Task<TResult> task, IObserver<TResult> observer)
+        {
+            if (task.IsCompleted)
+            {
+                ToObservableDone(task, observer);
+                return Disposable.Empty;
+            }
+
+            var cts = new CancellationDisposable();
+
+            task.ContinueWith(
+                (t, observerObject) => ToObservableDone(t, (IObserver<TResult>)observerObject), 
+                observer, 
+                cts.Token, 
+                TaskContinuationOptions.ExecuteSynchronously, 
+                TaskScheduler.Default);
+
+            return cts;
+        }
+
         /// <summary>
         /// Returns a task that will receive the last value or the exception produced by the observable sequence.
         /// </summary>