|
|
@@ -25,7 +25,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
|
|
|
|
|
|
- protected override void Run(_ sink) => sink.Run();
|
|
|
+ protected override void Run(_ sink) => sink.Run(_source);
|
|
|
|
|
|
internal sealed class _ : IdentitySink<TSource>
|
|
|
{
|
|
|
@@ -41,21 +41,15 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_queue = new Queue<TSource>();
|
|
|
}
|
|
|
|
|
|
- private IDisposable _sourceDisposable;
|
|
|
private IDisposable _loopDisposable;
|
|
|
|
|
|
- public void Run()
|
|
|
- {
|
|
|
- Disposable.SetSingle(ref _sourceDisposable, _parent._source.SubscribeSafe(this));
|
|
|
- }
|
|
|
-
|
|
|
protected override void Dispose(bool disposing)
|
|
|
{
|
|
|
if (disposing)
|
|
|
{
|
|
|
Disposable.TryDispose(ref _loopDisposable);
|
|
|
- Disposable.TryDispose(ref _sourceDisposable);
|
|
|
}
|
|
|
+
|
|
|
base.Dispose(disposing);
|
|
|
}
|
|
|
|
|
|
@@ -68,7 +62,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
public override void OnCompleted()
|
|
|
{
|
|
|
- Disposable.TryDispose(ref _sourceDisposable);
|
|
|
+ DisposeUpstream();
|
|
|
|
|
|
var longRunning = _parent._loopScheduler.AsLongRunning();
|
|
|
if (longRunning != null)
|