Bläddra i källkod

Avoid continuation leaks in TaskObservableExtensions.ToObservable. This is a bit more involved. The continuation must not be registered before a subscription happens because it might leak on long lived tasks. So instead of registering a continuation on a task once and forwarding the results to an AsyncSubject, we construct an Observable that registers the continuation for each subscription. Memory performance might of course go down if a lot of subscriptions happen on a task, but it won't leak.

Daniel Weber 7 år sedan
förälder
incheckning
26cbadf62c

+ 61 - 12
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -17,6 +17,65 @@ namespace System.Reactive.Threading.Tasks
     /// </summary>
     public static class TaskObservableExtensions
     {
+        private sealed class SlowTaskObservable : IObservable<Unit>
+        {
+            private readonly Task _task;
+            private readonly IScheduler _scheduler;
+
+            public SlowTaskObservable(Task task, IScheduler scheduler)
+            {
+                _task = task;
+                _scheduler = scheduler;
+            }
+
+            public IDisposable Subscribe(IObserver<Unit> observer)
+            {
+                if (observer == null)
+                {
+                    throw new ArgumentNullException(nameof(observer));
+                }
+
+                var cts = new CancellationDisposable();
+                var options = GetTaskContinuationOptions(_scheduler);
+
+                if (_scheduler == null)
+                    _task.ContinueWith((t, subjectObject) => t.EmitTaskResult((IObserver<Unit>)subjectObject), observer, cts.Token, options, TaskScheduler.Current);
+                else
+                    _task.ContinueWith((t, subjectObject) => _scheduler.ScheduleAction((t, subjectObject), tuple => tuple.t.EmitTaskResult((IObserver<Unit>)tuple.subjectObject)), observer, cts.Token, options, TaskScheduler.Current);
+
+                return cts;
+            }
+        }
+
+        private sealed class SlowTaskObservable<TResult> : IObservable<TResult>
+        {
+            private readonly Task<TResult> _task;
+            private readonly IScheduler _scheduler;
+
+            public SlowTaskObservable(Task<TResult> task, IScheduler scheduler)
+            {
+                _task = task;
+                _scheduler = scheduler;
+            }
+
+            public IDisposable Subscribe(IObserver<TResult> observer)
+            {
+                if (observer == null)
+                {
+                    throw new ArgumentNullException(nameof(observer));
+                }
+
+                var cts = new CancellationDisposable();
+                var options = GetTaskContinuationOptions(_scheduler);
+
+                if (_scheduler == null)
+                    _task.ContinueWith((t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject), observer, cts.Token, options, TaskScheduler.Current);
+                else
+                    _task.ContinueWith((t, subjectObject) => _scheduler.ScheduleAction((t, subjectObject), tuple => tuple.t.EmitTaskResult((IObserver<TResult>)tuple.subjectObject)), observer, cts.Token, options, TaskScheduler.Current);
+
+                return cts;
+            }
+        }
         /// <summary>
         /// Returns an observable sequence that signals when the task completes.
         /// </summary>
@@ -74,12 +133,7 @@ namespace System.Reactive.Threading.Tasks
                 return new Return<Unit>(Unit.Default, scheduler);
             }
 
-            var subject = new AsyncSubject<Unit>();
-            var options = GetTaskContinuationOptions(scheduler);
-
-            task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<Unit>)subjectObject), subject, options);
-
-            return subject.ToObservableResult(scheduler);
+            return new SlowTaskObservable(task, scheduler);
         }
 
         private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
@@ -178,12 +232,7 @@ namespace System.Reactive.Threading.Tasks
                 return new Return<TResult>(task.Result, scheduler);
             }
 
-            var subject = new AsyncSubject<TResult>();
-            var options = GetTaskContinuationOptions(scheduler);
-
-            task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<TResult>)subjectObject), subject, options);
-
-            return subject.ToObservableResult(scheduler);
+            return new SlowTaskObservable<TResult>(task, scheduler);
         }
 
         private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)