|
|
@@ -288,6 +288,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
|
|
|
private readonly object _gate = new object();
|
|
|
+ private readonly CancellationTokenSource _cts = new CancellationTokenSource();
|
|
|
private volatile int _count = 1;
|
|
|
|
|
|
public override void OnNext(Task<TSource> value)
|
|
|
@@ -299,7 +300,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- value.ContinueWith((t, thisObject) => ((_)thisObject).OnCompletedTask(t), this);
|
|
|
+ value.ContinueWith((t, thisObject) => ((_)thisObject).OnCompletedTask(t), this, _cts.Token);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -354,6 +355,14 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ protected override void Dispose(bool disposing)
|
|
|
+ {
|
|
|
+ if (disposing)
|
|
|
+ _cts.Cancel();
|
|
|
+
|
|
|
+ base.Dispose(disposing);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|