|
@@ -9,537 +9,602 @@ using System.Threading;
|
|
|
|
|
|
namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
- internal sealed class Delay<TSource> : Producer<TSource>
|
|
|
+ internal static class Delay<TSource>
|
|
|
{
|
|
|
- private readonly IObservable<TSource> _source;
|
|
|
- private readonly TimeSpan? _dueTimeR;
|
|
|
- private readonly DateTimeOffset? _dueTimeA;
|
|
|
- private readonly IScheduler _scheduler;
|
|
|
-
|
|
|
- public Delay(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
|
|
|
+ internal abstract class Base : Producer<TSource>
|
|
|
{
|
|
|
- _source = source;
|
|
|
- _dueTimeR = dueTime;
|
|
|
- _scheduler = scheduler;
|
|
|
- }
|
|
|
-
|
|
|
- public Delay(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
|
|
|
- {
|
|
|
- _source = source;
|
|
|
- _dueTimeA = dueTime;
|
|
|
- _scheduler = scheduler;
|
|
|
- }
|
|
|
+ protected readonly IObservable<TSource> _source;
|
|
|
+ protected readonly IScheduler _scheduler;
|
|
|
|
|
|
- protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
- {
|
|
|
- if (_scheduler.AsLongRunning() != null)
|
|
|
- {
|
|
|
- var sink = new LongRunningImpl(this, observer, cancel);
|
|
|
- setSink(sink);
|
|
|
- return sink.Run();
|
|
|
- }
|
|
|
- else
|
|
|
+ public Base(IObservable<TSource> source, IScheduler scheduler)
|
|
|
{
|
|
|
- var sink = new _(this, observer, cancel);
|
|
|
- setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ _source = source;
|
|
|
+ _scheduler = scheduler;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- class _ : Sink<TSource>, IObserver<TSource>
|
|
|
- {
|
|
|
- private readonly Delay<TSource> _parent;
|
|
|
|
|
|
- public _(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
- : base(observer, cancel)
|
|
|
+ protected abstract class _<TParent> : Sink<TSource>, IObserver<TSource>
|
|
|
+ where TParent : Base
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
- }
|
|
|
+ protected readonly object _gate = new object();
|
|
|
+ protected readonly SerialDisposable _cancelable = new SerialDisposable();
|
|
|
|
|
|
- private IScheduler _scheduler;
|
|
|
- private IDisposable _sourceSubscription;
|
|
|
- private SerialDisposable _cancelable;
|
|
|
- private TimeSpan _delay;
|
|
|
- private IStopwatch _watch;
|
|
|
-
|
|
|
- private object _gate;
|
|
|
- private bool _ready;
|
|
|
- private bool _active;
|
|
|
- private bool _running;
|
|
|
- private Queue<System.Reactive.TimeInterval<TSource>> _queue;
|
|
|
- private bool _hasCompleted;
|
|
|
- private TimeSpan _completeAt;
|
|
|
- private bool _hasFailed;
|
|
|
- private Exception _exception;
|
|
|
-
|
|
|
- public IDisposable Run()
|
|
|
- {
|
|
|
- _scheduler = _parent._scheduler;
|
|
|
+ protected readonly IScheduler _scheduler;
|
|
|
+
|
|
|
+ public _(TParent parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ : base(observer, cancel)
|
|
|
+ {
|
|
|
+ _scheduler = parent._scheduler;
|
|
|
+ }
|
|
|
|
|
|
- _cancelable = new SerialDisposable();
|
|
|
+ private IDisposable _sourceSubscription;
|
|
|
|
|
|
- _gate = new object();
|
|
|
- _active = false;
|
|
|
- _running = false;
|
|
|
- _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
|
|
|
- _hasCompleted = false;
|
|
|
- _completeAt = default(TimeSpan);
|
|
|
- _hasFailed = false;
|
|
|
- _exception = default(Exception);
|
|
|
+ protected IStopwatch _watch;
|
|
|
+ protected TimeSpan _delay;
|
|
|
+ protected bool _ready;
|
|
|
+ protected bool _active;
|
|
|
+ protected bool _running;
|
|
|
+ protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<Reactive.TimeInterval<TSource>>();
|
|
|
|
|
|
- _watch = _scheduler.StartStopwatch();
|
|
|
+ private bool _hasCompleted;
|
|
|
+ private TimeSpan _completeAt;
|
|
|
+ private bool _hasFailed;
|
|
|
+ private Exception _exception;
|
|
|
|
|
|
- if (_parent._dueTimeA.HasValue)
|
|
|
+ public IDisposable Run(TParent parent)
|
|
|
{
|
|
|
- _ready = false;
|
|
|
+ _active = false;
|
|
|
+ _running = false;
|
|
|
+ _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
|
|
|
+ _hasCompleted = false;
|
|
|
+ _completeAt = default(TimeSpan);
|
|
|
+ _hasFailed = false;
|
|
|
+ _exception = default(Exception);
|
|
|
|
|
|
- var dueTimeA = _parent._dueTimeA.Value;
|
|
|
- _cancelable.Disposable = _scheduler.Schedule(dueTimeA, Start);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- _ready = true;
|
|
|
+ _watch = _scheduler.StartStopwatch();
|
|
|
+
|
|
|
+ RunCore(parent);
|
|
|
+
|
|
|
+ var sourceSubscription = new SingleAssignmentDisposable();
|
|
|
+ _sourceSubscription = sourceSubscription;
|
|
|
+ sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
|
|
|
|
|
|
- var dueTimeR = _parent._dueTimeR.Value;
|
|
|
- _delay = Scheduler.Normalize(dueTimeR);
|
|
|
+ return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
|
|
|
}
|
|
|
|
|
|
- var sourceSubscription = new SingleAssignmentDisposable();
|
|
|
- _sourceSubscription = sourceSubscription;
|
|
|
- sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
|
|
|
+ protected abstract void RunCore(TParent parent);
|
|
|
|
|
|
- return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
|
|
|
- }
|
|
|
+ public void OnNext(TSource value)
|
|
|
+ {
|
|
|
+ var next = _watch.Elapsed.Add(_delay);
|
|
|
+ var shouldRun = false;
|
|
|
|
|
|
- private void Start()
|
|
|
- {
|
|
|
- var next = default(TimeSpan);
|
|
|
- var shouldRun = false;
|
|
|
+ lock (_gate)
|
|
|
+ {
|
|
|
+ _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
|
|
|
|
|
|
- lock (_gate)
|
|
|
+ shouldRun = _ready && !_active;
|
|
|
+ _active = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (shouldRun)
|
|
|
+ {
|
|
|
+ _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void OnError(Exception error)
|
|
|
{
|
|
|
- _delay = _watch.Elapsed;
|
|
|
+ _sourceSubscription.Dispose();
|
|
|
|
|
|
- var oldQueue = _queue;
|
|
|
- _queue = new Queue<Reactive.TimeInterval<TSource>>();
|
|
|
+ var shouldRun = false;
|
|
|
|
|
|
- if (oldQueue.Count > 0)
|
|
|
+ lock (_gate)
|
|
|
{
|
|
|
- next = oldQueue.Peek().Interval;
|
|
|
+ _queue.Clear();
|
|
|
|
|
|
- while (oldQueue.Count > 0)
|
|
|
- {
|
|
|
- var item = oldQueue.Dequeue();
|
|
|
- _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
|
|
|
- }
|
|
|
+ _exception = error;
|
|
|
+ _hasFailed = true;
|
|
|
|
|
|
- shouldRun = true;
|
|
|
- _active = true;
|
|
|
+ shouldRun = !_running;
|
|
|
}
|
|
|
|
|
|
- _ready = true;
|
|
|
+ if (shouldRun)
|
|
|
+ {
|
|
|
+ base._observer.OnError(error);
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- if (shouldRun)
|
|
|
+ public void OnCompleted()
|
|
|
{
|
|
|
- _cancelable.Disposable = _scheduler.Schedule(next, DrainQueue);
|
|
|
- }
|
|
|
- }
|
|
|
+ _sourceSubscription.Dispose();
|
|
|
|
|
|
- public void OnNext(TSource value)
|
|
|
- {
|
|
|
- var next = _watch.Elapsed.Add(_delay);
|
|
|
- var shouldRun = false;
|
|
|
+ var next = _watch.Elapsed.Add(_delay);
|
|
|
+ var shouldRun = false;
|
|
|
|
|
|
- lock (_gate)
|
|
|
- {
|
|
|
- _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
|
|
|
+ lock (_gate)
|
|
|
+ {
|
|
|
+ _completeAt = next;
|
|
|
+ _hasCompleted = true;
|
|
|
+
|
|
|
+ shouldRun = _ready && !_active;
|
|
|
+ _active = true;
|
|
|
+ }
|
|
|
|
|
|
- shouldRun = _ready && !_active;
|
|
|
- _active = true;
|
|
|
+ if (shouldRun)
|
|
|
+ {
|
|
|
+ _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- if (shouldRun)
|
|
|
+ protected void DrainQueue(Action<TimeSpan> recurse)
|
|
|
{
|
|
|
- _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
|
|
|
+ lock (_gate)
|
|
|
+ {
|
|
|
+ if (_hasFailed)
|
|
|
+ return;
|
|
|
+ _running = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // The shouldYield flag was added to address TFS 487881: "Delay can be unfair". In the old
|
|
|
+ // implementation, the loop below kept running while there was work for immediate dispatch,
|
|
|
+ // potentially causing a long running work item on the target scheduler. With the addition
|
|
|
+ // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
|
|
|
+ // interface and perform different processing (see LongRunningImpl). To reduce the code
|
|
|
+ // churn in the old loop code here, we set the shouldYield flag to true after the first
|
|
|
+ // dispatch iteration, in order to break from the loop and enter the recursive scheduling path.
|
|
|
+ //
|
|
|
+ var shouldYield = false;
|
|
|
+
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ var hasFailed = false;
|
|
|
+ var error = default(Exception);
|
|
|
+
|
|
|
+ var hasValue = false;
|
|
|
+ var value = default(TSource);
|
|
|
+ var hasCompleted = false;
|
|
|
+
|
|
|
+ var shouldRecurse = false;
|
|
|
+ var recurseDueTime = default(TimeSpan);
|
|
|
+
|
|
|
+ lock (_gate)
|
|
|
+ {
|
|
|
+ if (_hasFailed)
|
|
|
+ {
|
|
|
+ error = _exception;
|
|
|
+ hasFailed = true;
|
|
|
+ _running = false;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ var now = _watch.Elapsed;
|
|
|
+
|
|
|
+ if (_queue.Count > 0)
|
|
|
+ {
|
|
|
+ var nextDue = _queue.Peek().Interval;
|
|
|
+
|
|
|
+ if (nextDue.CompareTo(now) <= 0 && !shouldYield)
|
|
|
+ {
|
|
|
+ value = _queue.Dequeue().Value;
|
|
|
+ hasValue = true;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ shouldRecurse = true;
|
|
|
+ recurseDueTime = Scheduler.Normalize(nextDue.Subtract(now));
|
|
|
+ _running = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if (_hasCompleted)
|
|
|
+ {
|
|
|
+ if (_completeAt.CompareTo(now) <= 0 && !shouldYield)
|
|
|
+ {
|
|
|
+ hasCompleted = true;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ shouldRecurse = true;
|
|
|
+ recurseDueTime = Scheduler.Normalize(_completeAt.Subtract(now));
|
|
|
+ _running = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ _running = false;
|
|
|
+ _active = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } /* lock (_gate) */
|
|
|
+
|
|
|
+ if (hasValue)
|
|
|
+ {
|
|
|
+ base._observer.OnNext(value);
|
|
|
+ shouldYield = true;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (hasCompleted)
|
|
|
+ {
|
|
|
+ base._observer.OnCompleted();
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
+ else if (hasFailed)
|
|
|
+ {
|
|
|
+ base._observer.OnError(error);
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
+ else if (shouldRecurse)
|
|
|
+ {
|
|
|
+ recurse(recurseDueTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } /* while (true) */
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ protected abstract class L<TParent> : Sink<TSource>, IObserver<TSource>
|
|
|
+ where TParent : Base
|
|
|
{
|
|
|
- _sourceSubscription.Dispose();
|
|
|
+ protected readonly object _gate = new object();
|
|
|
+ protected readonly SerialDisposable _cancelable = new SerialDisposable();
|
|
|
+ private readonly SemaphoreSlim _evt = new SemaphoreSlim(0);
|
|
|
|
|
|
- var shouldRun = false;
|
|
|
+ private readonly IScheduler _scheduler;
|
|
|
|
|
|
- lock (_gate)
|
|
|
+ public L(TParent parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ : base(observer, cancel)
|
|
|
{
|
|
|
- _queue.Clear();
|
|
|
+ _scheduler = parent._scheduler;
|
|
|
+ }
|
|
|
|
|
|
- _exception = error;
|
|
|
- _hasFailed = true;
|
|
|
+ private IDisposable _sourceSubscription;
|
|
|
|
|
|
- shouldRun = !_running;
|
|
|
- }
|
|
|
+ protected IStopwatch _watch;
|
|
|
+ protected TimeSpan _delay;
|
|
|
+ protected Queue<System.Reactive.TimeInterval<TSource>> _queue;
|
|
|
|
|
|
- if (shouldRun)
|
|
|
+ private CancellationTokenSource _stop;
|
|
|
+ private bool _hasCompleted;
|
|
|
+ private TimeSpan _completeAt;
|
|
|
+ private bool _hasFailed;
|
|
|
+ private Exception _exception;
|
|
|
+
|
|
|
+ public IDisposable Run(TParent parent)
|
|
|
{
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
- }
|
|
|
+ _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
|
|
|
+ _hasCompleted = false;
|
|
|
+ _completeAt = default(TimeSpan);
|
|
|
+ _hasFailed = false;
|
|
|
+ _exception = default(Exception);
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
- {
|
|
|
- _sourceSubscription.Dispose();
|
|
|
+ _watch = _scheduler.StartStopwatch();
|
|
|
|
|
|
- var next = _watch.Elapsed.Add(_delay);
|
|
|
- var shouldRun = false;
|
|
|
+ RunCore(parent);
|
|
|
|
|
|
- lock (_gate)
|
|
|
- {
|
|
|
- _completeAt = next;
|
|
|
- _hasCompleted = true;
|
|
|
+ var sourceSubscription = new SingleAssignmentDisposable();
|
|
|
+ _sourceSubscription = sourceSubscription;
|
|
|
+ sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
|
|
|
|
|
|
- shouldRun = _ready && !_active;
|
|
|
- _active = true;
|
|
|
+ return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
|
|
|
}
|
|
|
|
|
|
- if (shouldRun)
|
|
|
+ protected abstract void RunCore(TParent parent);
|
|
|
+
|
|
|
+ protected void ScheduleDrain()
|
|
|
{
|
|
|
- _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
|
|
|
+ _stop = new CancellationTokenSource();
|
|
|
+ _cancelable.Disposable = Disposable.Create(_stop.Cancel);
|
|
|
+
|
|
|
+ _scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- private void DrainQueue(Action<TimeSpan> recurse)
|
|
|
- {
|
|
|
- lock (_gate)
|
|
|
+ public void OnNext(TSource value)
|
|
|
{
|
|
|
- if (_hasFailed)
|
|
|
- return;
|
|
|
- _running = true;
|
|
|
- }
|
|
|
+ var next = _watch.Elapsed.Add(_delay);
|
|
|
+
|
|
|
+ lock (_gate)
|
|
|
+ {
|
|
|
+ _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
|
|
|
|
|
|
- //
|
|
|
- // The shouldYield flag was added to address TFS 487881: "Delay can be unfair". In the old
|
|
|
- // implementation, the loop below kept running while there was work for immediate dispatch,
|
|
|
- // potentially causing a long running work item on the target scheduler. With the addition
|
|
|
- // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
|
|
|
- // interface and perform different processing (see LongRunningImpl). To reduce the code
|
|
|
- // churn in the old loop code here, we set the shouldYield flag to true after the first
|
|
|
- // dispatch iteration, in order to break from the loop and enter the recursive scheduling path.
|
|
|
- //
|
|
|
- var shouldYield = false;
|
|
|
+ _evt.Release();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- while (true)
|
|
|
+ public void OnError(Exception error)
|
|
|
{
|
|
|
- var hasFailed = false;
|
|
|
- var error = default(Exception);
|
|
|
+ _sourceSubscription.Dispose();
|
|
|
+
|
|
|
+ lock (_gate)
|
|
|
+ {
|
|
|
+ _queue.Clear();
|
|
|
|
|
|
- var hasValue = false;
|
|
|
- var value = default(TSource);
|
|
|
- var hasCompleted = false;
|
|
|
+ _exception = error;
|
|
|
+ _hasFailed = true;
|
|
|
|
|
|
- var shouldRecurse = false;
|
|
|
- var recurseDueTime = default(TimeSpan);
|
|
|
+ _evt.Release();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void OnCompleted()
|
|
|
+ {
|
|
|
+ _sourceSubscription.Dispose();
|
|
|
+
|
|
|
+ var next = _watch.Elapsed.Add(_delay);
|
|
|
|
|
|
lock (_gate)
|
|
|
{
|
|
|
- if (_hasFailed)
|
|
|
+ _completeAt = next;
|
|
|
+ _hasCompleted = true;
|
|
|
+
|
|
|
+ _evt.Release();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void DrainQueue(ICancelable cancel)
|
|
|
+ {
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ try
|
|
|
{
|
|
|
- error = _exception;
|
|
|
- hasFailed = true;
|
|
|
- _running = false;
|
|
|
+ _evt.Wait(_stop.Token);
|
|
|
}
|
|
|
- else
|
|
|
+ catch (OperationCanceledException)
|
|
|
{
|
|
|
- var now = _watch.Elapsed;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ var hasFailed = false;
|
|
|
+ var error = default(Exception);
|
|
|
|
|
|
- if (_queue.Count > 0)
|
|
|
+ var hasValue = false;
|
|
|
+ var value = default(TSource);
|
|
|
+ var hasCompleted = false;
|
|
|
+
|
|
|
+ var shouldWait = false;
|
|
|
+ var waitTime = default(TimeSpan);
|
|
|
+
|
|
|
+ lock (_gate)
|
|
|
+ {
|
|
|
+ if (_hasFailed)
|
|
|
{
|
|
|
- var nextDue = _queue.Peek().Interval;
|
|
|
+ error = _exception;
|
|
|
+ hasFailed = true;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ var now = _watch.Elapsed;
|
|
|
|
|
|
- if (nextDue.CompareTo(now) <= 0 && !shouldYield)
|
|
|
+ if (_queue.Count > 0)
|
|
|
{
|
|
|
- value = _queue.Dequeue().Value;
|
|
|
+ var next = _queue.Dequeue();
|
|
|
+
|
|
|
hasValue = true;
|
|
|
+ value = next.Value;
|
|
|
+
|
|
|
+ var nextDue = next.Interval;
|
|
|
+ if (nextDue.CompareTo(now) > 0)
|
|
|
+ {
|
|
|
+ shouldWait = true;
|
|
|
+ waitTime = Scheduler.Normalize(nextDue.Subtract(now));
|
|
|
+ }
|
|
|
}
|
|
|
- else
|
|
|
+ else if (_hasCompleted)
|
|
|
{
|
|
|
- shouldRecurse = true;
|
|
|
- recurseDueTime = Scheduler.Normalize(nextDue.Subtract(now));
|
|
|
- _running = false;
|
|
|
+ hasCompleted = true;
|
|
|
+
|
|
|
+ if (_completeAt.CompareTo(now) > 0)
|
|
|
+ {
|
|
|
+ shouldWait = true;
|
|
|
+ waitTime = Scheduler.Normalize(_completeAt.Subtract(now));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- else if (_hasCompleted)
|
|
|
+ } /* lock (_gate) */
|
|
|
+
|
|
|
+ if (shouldWait)
|
|
|
+ {
|
|
|
+ var timer = new ManualResetEventSlim();
|
|
|
+ _scheduler.Schedule(waitTime, () => { timer.Set(); });
|
|
|
+
|
|
|
+ try
|
|
|
{
|
|
|
- if (_completeAt.CompareTo(now) <= 0 && !shouldYield)
|
|
|
- {
|
|
|
- hasCompleted = true;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- shouldRecurse = true;
|
|
|
- recurseDueTime = Scheduler.Normalize(_completeAt.Subtract(now));
|
|
|
- _running = false;
|
|
|
- }
|
|
|
+ timer.Wait(_stop.Token);
|
|
|
}
|
|
|
- else
|
|
|
+ catch (OperationCanceledException)
|
|
|
{
|
|
|
- _running = false;
|
|
|
- _active = false;
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
- } /* lock (_gate) */
|
|
|
|
|
|
- if (hasValue)
|
|
|
- {
|
|
|
- base._observer.OnNext(value);
|
|
|
- shouldYield = true;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- if (hasCompleted)
|
|
|
- {
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
- else if (hasFailed)
|
|
|
+ if (hasValue)
|
|
|
{
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
+ base._observer.OnNext(value);
|
|
|
}
|
|
|
- else if (shouldRecurse)
|
|
|
+ else
|
|
|
{
|
|
|
- recurse(recurseDueTime);
|
|
|
- }
|
|
|
+ if (hasCompleted)
|
|
|
+ {
|
|
|
+ base._observer.OnCompleted();
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
+ else if (hasFailed)
|
|
|
+ {
|
|
|
+ base._observer.OnError(error);
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
|
|
|
- return;
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
- } /* while (true) */
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- class LongRunningImpl : Sink<TSource>, IObserver<TSource>
|
|
|
+ internal sealed class Absolute : Base
|
|
|
{
|
|
|
- private readonly Delay<TSource> _parent;
|
|
|
+ private readonly DateTimeOffset _dueTime;
|
|
|
|
|
|
- public LongRunningImpl(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
- : base(observer, cancel)
|
|
|
+ public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
|
|
|
+ : base(source, scheduler)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
+ _dueTime = dueTime;
|
|
|
}
|
|
|
|
|
|
- private IDisposable _sourceSubscription;
|
|
|
- private SerialDisposable _cancelable;
|
|
|
- private TimeSpan _delay;
|
|
|
- private IStopwatch _watch;
|
|
|
-
|
|
|
- private object _gate;
|
|
|
- private SemaphoreSlim _evt;
|
|
|
- private CancellationTokenSource _stop;
|
|
|
- private Queue<System.Reactive.TimeInterval<TSource>> _queue;
|
|
|
- private bool _hasCompleted;
|
|
|
- private TimeSpan _completeAt;
|
|
|
- private bool _hasFailed;
|
|
|
- private Exception _exception;
|
|
|
-
|
|
|
- public IDisposable Run()
|
|
|
+ protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- _cancelable = new SerialDisposable();
|
|
|
-
|
|
|
- _gate = new object();
|
|
|
- _evt = new SemaphoreSlim(0);
|
|
|
- _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
|
|
|
- _hasCompleted = false;
|
|
|
- _completeAt = default(TimeSpan);
|
|
|
- _hasFailed = false;
|
|
|
- _exception = default(Exception);
|
|
|
-
|
|
|
- _watch = _parent._scheduler.StartStopwatch();
|
|
|
-
|
|
|
- if (_parent._dueTimeA.HasValue)
|
|
|
+ if (_scheduler.AsLongRunning() != null)
|
|
|
{
|
|
|
- var dueTimeA = _parent._dueTimeA.Value;
|
|
|
- _cancelable.Disposable = _parent._scheduler.Schedule(dueTimeA, Start);
|
|
|
+ var sink = new L(this, observer, cancel);
|
|
|
+ setSink(sink);
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- var dueTimeR = _parent._dueTimeR.Value;
|
|
|
- _delay = Scheduler.Normalize(dueTimeR);
|
|
|
- ScheduleDrain();
|
|
|
+ var sink = new _(this, observer, cancel);
|
|
|
+ setSink(sink);
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
-
|
|
|
- var sourceSubscription = new SingleAssignmentDisposable();
|
|
|
- _sourceSubscription = sourceSubscription;
|
|
|
- sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
|
|
|
-
|
|
|
- return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
|
|
|
}
|
|
|
|
|
|
- private void Start()
|
|
|
+ private sealed class _ : _<Absolute>
|
|
|
{
|
|
|
- lock (_gate)
|
|
|
+ public _(Absolute parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ : base(parent, observer, cancel)
|
|
|
{
|
|
|
- _delay = _watch.Elapsed;
|
|
|
-
|
|
|
- var oldQueue = _queue;
|
|
|
- _queue = new Queue<Reactive.TimeInterval<TSource>>();
|
|
|
-
|
|
|
- while (oldQueue.Count > 0)
|
|
|
- {
|
|
|
- var item = oldQueue.Dequeue();
|
|
|
- _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- ScheduleDrain();
|
|
|
- }
|
|
|
+ protected override void RunCore(Absolute parent)
|
|
|
+ {
|
|
|
+ _ready = false;
|
|
|
|
|
|
- private void ScheduleDrain()
|
|
|
- {
|
|
|
- _stop = new CancellationTokenSource();
|
|
|
- _cancelable.Disposable = Disposable.Create(() => _stop.Cancel());
|
|
|
+ _cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start);
|
|
|
+ }
|
|
|
|
|
|
- _parent._scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
|
|
|
- }
|
|
|
+ private void Start()
|
|
|
+ {
|
|
|
+ var next = default(TimeSpan);
|
|
|
+ var shouldRun = false;
|
|
|
|
|
|
- public void OnNext(TSource value)
|
|
|
- {
|
|
|
- var next = _watch.Elapsed.Add(_delay);
|
|
|
+ lock (_gate)
|
|
|
+ {
|
|
|
+ _delay = _watch.Elapsed;
|
|
|
|
|
|
- lock (_gate)
|
|
|
- {
|
|
|
- _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
|
|
|
+ var oldQueue = _queue;
|
|
|
+ _queue = new Queue<Reactive.TimeInterval<TSource>>();
|
|
|
|
|
|
- _evt.Release();
|
|
|
- }
|
|
|
- }
|
|
|
+ if (oldQueue.Count > 0)
|
|
|
+ {
|
|
|
+ next = oldQueue.Peek().Interval;
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- _sourceSubscription.Dispose();
|
|
|
+ while (oldQueue.Count > 0)
|
|
|
+ {
|
|
|
+ var item = oldQueue.Dequeue();
|
|
|
+ _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
|
|
|
+ }
|
|
|
|
|
|
- lock (_gate)
|
|
|
- {
|
|
|
- _queue.Clear();
|
|
|
+ shouldRun = true;
|
|
|
+ _active = true;
|
|
|
+ }
|
|
|
|
|
|
- _exception = error;
|
|
|
- _hasFailed = true;
|
|
|
+ _ready = true;
|
|
|
+ }
|
|
|
|
|
|
- _evt.Release();
|
|
|
+ if (shouldRun)
|
|
|
+ {
|
|
|
+ _cancelable.Disposable = _scheduler.Schedule(next, DrainQueue);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ private sealed class L : L<Absolute>
|
|
|
{
|
|
|
- _sourceSubscription.Dispose();
|
|
|
-
|
|
|
- var next = _watch.Elapsed.Add(_delay);
|
|
|
-
|
|
|
- lock (_gate)
|
|
|
+ public L(Absolute parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ : base(parent, observer, cancel)
|
|
|
{
|
|
|
- _completeAt = next;
|
|
|
- _hasCompleted = true;
|
|
|
+ }
|
|
|
|
|
|
- _evt.Release();
|
|
|
+ protected override void RunCore(Absolute parent)
|
|
|
+ {
|
|
|
+ _cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start);
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- private void DrainQueue(ICancelable cancel)
|
|
|
- {
|
|
|
- while (true)
|
|
|
+ private void Start()
|
|
|
{
|
|
|
- try
|
|
|
- {
|
|
|
- _evt.Wait(_stop.Token);
|
|
|
- }
|
|
|
- catch (OperationCanceledException)
|
|
|
+ lock (_gate)
|
|
|
{
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- var hasFailed = false;
|
|
|
- var error = default(Exception);
|
|
|
+ _delay = _watch.Elapsed;
|
|
|
|
|
|
- var hasValue = false;
|
|
|
- var value = default(TSource);
|
|
|
- var hasCompleted = false;
|
|
|
+ var oldQueue = _queue;
|
|
|
+ _queue = new Queue<Reactive.TimeInterval<TSource>>();
|
|
|
|
|
|
- var shouldWait = false;
|
|
|
- var waitTime = default(TimeSpan);
|
|
|
-
|
|
|
- lock (_gate)
|
|
|
- {
|
|
|
- if (_hasFailed)
|
|
|
+ while (oldQueue.Count > 0)
|
|
|
{
|
|
|
- error = _exception;
|
|
|
- hasFailed = true;
|
|
|
+ var item = oldQueue.Dequeue();
|
|
|
+ _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
|
|
|
}
|
|
|
- else
|
|
|
- {
|
|
|
- var now = _watch.Elapsed;
|
|
|
+ }
|
|
|
|
|
|
- if (_queue.Count > 0)
|
|
|
- {
|
|
|
- var next = _queue.Dequeue();
|
|
|
+ ScheduleDrain();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- hasValue = true;
|
|
|
- value = next.Value;
|
|
|
+ internal sealed class Relative : Base
|
|
|
+ {
|
|
|
+ private readonly TimeSpan _dueTime;
|
|
|
|
|
|
- var nextDue = next.Interval;
|
|
|
- if (nextDue.CompareTo(now) > 0)
|
|
|
- {
|
|
|
- shouldWait = true;
|
|
|
- waitTime = Scheduler.Normalize(nextDue.Subtract(now));
|
|
|
- }
|
|
|
- }
|
|
|
- else if (_hasCompleted)
|
|
|
- {
|
|
|
- hasCompleted = true;
|
|
|
+ public Relative(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
|
|
|
+ : base(source, scheduler)
|
|
|
+ {
|
|
|
+ _dueTime = dueTime;
|
|
|
+ }
|
|
|
|
|
|
- if (_completeAt.CompareTo(now) > 0)
|
|
|
- {
|
|
|
- shouldWait = true;
|
|
|
- waitTime = Scheduler.Normalize(_completeAt.Subtract(now));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } /* lock (_gate) */
|
|
|
+ protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
+ {
|
|
|
+ if (_scheduler.AsLongRunning() != null)
|
|
|
+ {
|
|
|
+ var sink = new L(this, observer, cancel);
|
|
|
+ setSink(sink);
|
|
|
+ return sink.Run(this);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ var sink = new _(this, observer, cancel);
|
|
|
+ setSink(sink);
|
|
|
+ return sink.Run(this);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- if (shouldWait)
|
|
|
- {
|
|
|
- var timer = new ManualResetEventSlim();
|
|
|
- _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
|
|
|
+ private sealed class _ : _<Relative>
|
|
|
+ {
|
|
|
+ public _(Relative parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ : base(parent, observer, cancel)
|
|
|
+ {
|
|
|
+ }
|
|
|
|
|
|
- try
|
|
|
- {
|
|
|
- timer.Wait(_stop.Token);
|
|
|
- }
|
|
|
- catch (OperationCanceledException)
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
+ protected override void RunCore(Relative parent)
|
|
|
+ {
|
|
|
+ _ready = true;
|
|
|
|
|
|
- if (hasValue)
|
|
|
- {
|
|
|
- base._observer.OnNext(value);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- if (hasCompleted)
|
|
|
- {
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
- else if (hasFailed)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
+ _delay = Scheduler.Normalize(parent._dueTime);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- return;
|
|
|
- }
|
|
|
+ private sealed class L : L<Relative>
|
|
|
+ {
|
|
|
+ public L(Relative parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ : base(parent, observer, cancel)
|
|
|
+ {
|
|
|
+ }
|
|
|
+
|
|
|
+ protected override void RunCore(Relative parent)
|
|
|
+ {
|
|
|
+ _delay = Scheduler.Normalize(parent._dueTime);
|
|
|
+ ScheduleDrain();
|
|
|
}
|
|
|
}
|
|
|
}
|