|
|
@@ -28,50 +28,49 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
var sink = new _(this, observer, cancel);
|
|
|
setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ return sink.Run(_source);
|
|
|
}
|
|
|
|
|
|
private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- // CONSIDER: This sink has a parent reference that can be considered for removal.
|
|
|
+ private readonly TimeSpan _dueTime;
|
|
|
+ private readonly IObservable<TSource> _other;
|
|
|
+ private readonly IScheduler _scheduler;
|
|
|
|
|
|
- private readonly Relative _parent;
|
|
|
+ private readonly object _gate = new object();
|
|
|
+ private SerialDisposable _subscription = new SerialDisposable();
|
|
|
+ private SerialDisposable _timer = new SerialDisposable();
|
|
|
|
|
|
public _(Relative parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
+ _dueTime = parent._dueTime;
|
|
|
+ _other = parent._other;
|
|
|
+ _scheduler = parent._scheduler;
|
|
|
}
|
|
|
|
|
|
- private SerialDisposable _subscription;
|
|
|
- private SerialDisposable _timer;
|
|
|
-
|
|
|
- private object _gate;
|
|
|
private ulong _id;
|
|
|
private bool _switched;
|
|
|
|
|
|
- public IDisposable Run()
|
|
|
+ public IDisposable Run(IObservable<TSource> source)
|
|
|
{
|
|
|
- _subscription = new SerialDisposable();
|
|
|
- _timer = new SerialDisposable();
|
|
|
var original = new SingleAssignmentDisposable();
|
|
|
|
|
|
_subscription.Disposable = original;
|
|
|
|
|
|
- _gate = new object();
|
|
|
_id = 0UL;
|
|
|
_switched = false;
|
|
|
|
|
|
CreateTimer();
|
|
|
|
|
|
- original.Disposable = _parent._source.SubscribeSafe(this);
|
|
|
+ original.Disposable = source.SubscribeSafe(this);
|
|
|
|
|
|
return StableCompositeDisposable.Create(_subscription, _timer);
|
|
|
}
|
|
|
|
|
|
private void CreateTimer()
|
|
|
{
|
|
|
- _timer.Disposable = _parent._scheduler.Schedule(_id, _parent._dueTime, Timeout);
|
|
|
+ _timer.Disposable = _scheduler.Schedule(_id, _dueTime, Timeout);
|
|
|
}
|
|
|
|
|
|
private IDisposable Timeout(IScheduler _, ulong myid)
|
|
|
@@ -85,7 +84,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
|
|
|
if (timerWins)
|
|
|
- _subscription.Disposable = _parent._other.SubscribeSafe(this.GetForwarder());
|
|
|
+ _subscription.Disposable = _other.SubscribeSafe(this.GetForwarder());
|
|
|
|
|
|
return Disposable.Empty;
|
|
|
}
|
|
|
@@ -178,24 +177,23 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
private readonly IObservable<TSource> _other;
|
|
|
|
|
|
+ private readonly object _gate = new object();
|
|
|
+ private readonly SerialDisposable _subscription = new SerialDisposable();
|
|
|
+
|
|
|
public _(IObservable<TSource> other, IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
_other = other;
|
|
|
}
|
|
|
|
|
|
- private SerialDisposable _subscription;
|
|
|
- private object _gate;
|
|
|
private bool _switched;
|
|
|
|
|
|
public IDisposable Run(Absolute parent)
|
|
|
{
|
|
|
- _subscription = new SerialDisposable();
|
|
|
var original = new SingleAssignmentDisposable();
|
|
|
|
|
|
_subscription.Disposable = original;
|
|
|
|
|
|
- _gate = new object();
|
|
|
_switched = false;
|
|
|
|
|
|
var timer = parent._scheduler.Schedule(parent._dueTime, Timeout);
|
|
|
@@ -284,42 +282,40 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
var sink = new _(this, observer, cancel);
|
|
|
setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
|
|
|
private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- // CONSIDER: This sink has a parent reference that can be considered for removal.
|
|
|
+ private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector;
|
|
|
+ private readonly IObservable<TSource> _other;
|
|
|
|
|
|
- private readonly Timeout<TSource, TTimeout> _parent;
|
|
|
+ private readonly object _gate = new object();
|
|
|
+ private readonly SerialDisposable _subscription = new SerialDisposable();
|
|
|
+ private readonly SerialDisposable _timer = new SerialDisposable();
|
|
|
|
|
|
public _(Timeout<TSource, TTimeout> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
+ _timeoutSelector = parent._timeoutSelector;
|
|
|
+ _other = parent._other;
|
|
|
}
|
|
|
|
|
|
- private SerialDisposable _subscription;
|
|
|
- private SerialDisposable _timer;
|
|
|
- private object _gate;
|
|
|
private ulong _id;
|
|
|
private bool _switched;
|
|
|
|
|
|
- public IDisposable Run()
|
|
|
+ public IDisposable Run(Timeout<TSource, TTimeout> parent)
|
|
|
{
|
|
|
- _subscription = new SerialDisposable();
|
|
|
- _timer = new SerialDisposable();
|
|
|
var original = new SingleAssignmentDisposable();
|
|
|
|
|
|
_subscription.Disposable = original;
|
|
|
|
|
|
- _gate = new object();
|
|
|
_id = 0UL;
|
|
|
_switched = false;
|
|
|
|
|
|
- SetTimer(_parent._firstTimeout);
|
|
|
+ SetTimer(parent._firstTimeout);
|
|
|
|
|
|
- original.Disposable = _parent._source.SubscribeSafe(this);
|
|
|
+ original.Disposable = parent._source.SubscribeSafe(this);
|
|
|
|
|
|
return StableCompositeDisposable.Create(_subscription, _timer);
|
|
|
}
|
|
|
@@ -333,7 +329,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
var timeout = default(IObservable<TTimeout>);
|
|
|
try
|
|
|
{
|
|
|
- timeout = _parent._timeoutSelector(value);
|
|
|
+ timeout = _timeoutSelector(value);
|
|
|
}
|
|
|
catch (Exception error)
|
|
|
{
|
|
|
@@ -389,7 +385,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
public void OnNext(TTimeout value)
|
|
|
{
|
|
|
if (TimerWins())
|
|
|
- _parent._subscription.Disposable = _parent._parent._other.SubscribeSafe(_parent.GetForwarder());
|
|
|
+ _parent._subscription.Disposable = _parent._other.SubscribeSafe(_parent.GetForwarder());
|
|
|
|
|
|
_self.Dispose();
|
|
|
}
|
|
|
@@ -406,7 +402,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
public void OnCompleted()
|
|
|
{
|
|
|
if (TimerWins())
|
|
|
- _parent._subscription.Disposable = _parent._parent._other.SubscribeSafe(_parent.GetForwarder());
|
|
|
+ _parent._subscription.Disposable = _parent._other.SubscribeSafe(_parent.GetForwarder());
|
|
|
}
|
|
|
|
|
|
private bool TimerWins()
|