|
|
@@ -24,19 +24,19 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
var sink = new _(this, observer, cancel);
|
|
|
setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ return sink.Run(_source);
|
|
|
}
|
|
|
|
|
|
private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- // CONSIDER: This sink has a parent reference that can be considered for removal.
|
|
|
-
|
|
|
- private readonly Throttle<TSource> _parent;
|
|
|
+ private readonly TimeSpan _dueTime;
|
|
|
+ private readonly IScheduler _scheduler;
|
|
|
|
|
|
public _(Throttle<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
+ _dueTime = parent._dueTime;
|
|
|
+ _scheduler = parent._scheduler;
|
|
|
}
|
|
|
|
|
|
private object _gate;
|
|
|
@@ -45,7 +45,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
private SerialDisposable _cancelable;
|
|
|
private ulong _id;
|
|
|
|
|
|
- public IDisposable Run()
|
|
|
+ public IDisposable Run(IObservable<TSource> source)
|
|
|
{
|
|
|
_gate = new object();
|
|
|
_value = default(TSource);
|
|
|
@@ -53,7 +53,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_cancelable = new SerialDisposable();
|
|
|
_id = 0UL;
|
|
|
|
|
|
- var subscription = _parent._source.SubscribeSafe(this);
|
|
|
+ var subscription = source.SubscribeSafe(this);
|
|
|
|
|
|
return StableCompositeDisposable.Create(subscription, _cancelable);
|
|
|
}
|
|
|
@@ -70,7 +70,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
var d = new SingleAssignmentDisposable();
|
|
|
_cancelable.Disposable = d;
|
|
|
- d.Disposable = _parent._scheduler.Schedule(currentid, _parent._dueTime, Propagate);
|
|
|
+ d.Disposable = _scheduler.Schedule(currentid, _dueTime, Propagate);
|
|
|
}
|
|
|
|
|
|
private IDisposable Propagate(IScheduler self, ulong currentid)
|