Browse Source

SelectMany: Cancel continuations as soon as the Sink is disposed to avoid a leaking continuation if the task lives longer than the Sink.

Daniel Weber 7 years ago
parent
commit
c4121798ff
1 changed files with 4 additions and 4 deletions
  1. 4 4
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs

@@ -597,7 +597,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     //
                     // Separate method to avoid closure in synchronous completion case.
                     //
-                    task.ContinueWith(t => OnCompletedTask(value, t));
+                    task.ContinueWith(t => OnCompletedTask(value, t), _cancel.Token);
                 }
 
                 private void OnCompletedTask(TSource value, Task<TCollection> task)
@@ -758,7 +758,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     //
                     // Separate method to avoid closure in synchronous completion case.
                     //
-                    task.ContinueWith(t => OnCompletedTask(value, index, t));
+                    task.ContinueWith(t => OnCompletedTask(value, index, t), _cancel.Token);
                 }
 
                 private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
@@ -1538,7 +1538,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this);
+                        task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this, _cts.Token);
                     }
                 }
 
@@ -1670,7 +1670,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this);
+                        task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this, _cts.Token);
                     }
                 }