| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Threading;namespace System.Reactive.Linq.ObservableImpl{    internal sealed class Timer : Producer<long>    {        private readonly DateTimeOffset? _dueTimeA;        private readonly TimeSpan? _dueTimeR;        private readonly TimeSpan? _period;        private readonly IScheduler _scheduler;        public Timer(DateTimeOffset dueTime, TimeSpan? period, IScheduler scheduler)        {            _dueTimeA = dueTime;            _period = period;            _scheduler = scheduler;        }        public Timer(TimeSpan dueTime, TimeSpan? period, IScheduler scheduler)        {            _dueTimeR = dueTime;            _period = period;            _scheduler = scheduler;        }        protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)        {            if (_period.HasValue)            {                var sink = new TimerImpl(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else            {                var sink = new _(this, observer, cancel);                setSink(sink);                return sink.Run();            }        }        class _ : Sink<long>        {            private readonly Timer _parent;            public _(Timer parent, IObserver<long> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            public IDisposable Run()            {                if (_parent._dueTimeA.HasValue)                {                    return _parent._scheduler.Schedule(_parent._dueTimeA.Value, Invoke);                }                else                {                    return _parent._scheduler.Schedule(_parent._dueTimeR.Value, Invoke);                }            }            private void Invoke()            {                base._observer.OnNext(0);                base._observer.OnCompleted();                base.Dispose();            }        }        class TimerImpl : Sink<long>        {            private readonly Timer _parent;            private readonly TimeSpan _period;            public TimerImpl(Timer parent, IObserver<long> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;                _period = _parent._period.Value;            }            public IDisposable Run()            {                if (_parent._dueTimeA.HasValue)                {                    var dueTime = _parent._dueTimeA.Value;                    return _parent._scheduler.Schedule(default(object), dueTime, InvokeStart);                }                else                {                    var dueTime = _parent._dueTimeR.Value;                    //                    // Optimize for the case of Observable.Interval.                    //                    if (dueTime == _period)                    {                        return _parent._scheduler.SchedulePeriodic(0L, _period, (Func<long, long>)Tick);                    }                    return _parent._scheduler.Schedule(default(object), dueTime, InvokeStart);                }            }            //            // BREAKING CHANGE v2 > v1.x - No more correction for time drift based on absolute time. This            //                             didn't work for large period values anyway; the fractional            //                             error exceeded corrections. Also complicated dealing with system            //                             clock change conditions and caused numerous bugs.            //            // - For more precise scheduling, use a custom scheduler that measures TimeSpan values in a            //   better way, e.g. spinning to make up for the last part of the period. Whether or not the            //   values of the TimeSpan period match NT time or wall clock time is up to the scheduler.            //            // - For more accurate scheduling wrt the system clock, use Generate with DateTimeOffset time            //   selectors. When the system clock changes, intervals will not be the same as diffs between            //   consecutive absolute time values. The precision will be low (1s range by default).            //            private long Tick(long count)            {                base._observer.OnNext(count);                return unchecked(count + 1);            }            private int _pendingTickCount;            private IDisposable _periodic;            private IDisposable InvokeStart(IScheduler self, object state)            {                //                // Notice the first call to OnNext will introduce skew if it takes significantly long when                // using the following naive implementation:                //                //    Code:  base._observer.OnNext(0L);                //           return self.SchedulePeriodicEmulated(1L, _period, (Func<long, long>)Tick);                //                // What we're saying here is that Observable.Timer(dueTime, period) is pretty much the same                // as writing Observable.Timer(dueTime).Concat(Observable.Interval(period)).                //                //    Expected:  dueTime                //                  |                //                  0--period--1--period--2--period--3--period--4--...                //                  |                //                  +-OnNext(0L)-|                //                    //    Actual:    dueTime                //                  |                //                  0------------#--period--1--period--2--period--3--period--4--...                //                  |                //                  +-OnNext(0L)-|                //                // Different solutions for this behavior have different problems:                //                // 1. Scheduling the periodic job first and using an AsyncLock to serialize the OnNext calls                //    has the drawback that InvokeStart may never return. This happens when every callback                //    doesn't meet the period's deadline, hence the periodic job keeps queueing stuff up. In                //    this case, InvokeStart stays the owner of the AsyncLock and the call to Wait will never                //    return, thus not allowing any interleaving of work on this scheduler's logical thread.                //                // 2. Scheduling the periodic job first and using a (blocking) synchronization primitive to                //    signal completion of the OnNext(0L) call to the Tick call requires quite a bit of state                //    and careful handling of the case when OnNext(0L) throws. What's worse is the blocking                //    behavior inside Tick.                //                // In order to avoid blocking behavior, we need a scheme much like SchedulePeriodic emulation                // where work to dispatch OnNext(n + 1) is delegated to a catch up loop in case OnNext(n) was                // still running. Because SchedulePeriodic emulation exhibits such behavior in all cases, we                // only need to deal with the overlap of OnNext(0L) with future periodic OnNext(n) dispatch                // jobs. In the worst case where every callback takes longer than the deadline implied by the                // period, the periodic job will just queue up work that's dispatched by the tail-recursive                // catch up loop. In the best case, all work will be dispatched on the periodic scheduler.                //                //                // We start with one tick pending because we're about to start doing OnNext(0L).                //                _pendingTickCount = 1;                var d = new SingleAssignmentDisposable();                _periodic = d;                d.Disposable = self.SchedulePeriodic(1L, _period, (Func<long, long>)Tock);                try                {                    base._observer.OnNext(0L);                }                catch (Exception e)                {                    d.Dispose();                    e.Throw();                }                //                // If the periodic scheduling job already ran before we finished dispatching the OnNext(0L)                // call, we'll find pendingTickCount to be > 1. In this case, we need to catch up by dispatching                // subsequent calls to OnNext as fast as possible, but without running a loop in order to ensure                // fair play with the scheduler. So, we run a tail-recursive loop in CatchUp instead.                //                if (Interlocked.Decrement(ref _pendingTickCount) > 0)                {                    var c = new SingleAssignmentDisposable();                    c.Disposable = self.Schedule(1L, CatchUp);                    return StableCompositeDisposable.Create(d, c);                }                return d;            }            private long Tock(long count)            {                //                // Notice the handler for (emulated) periodic scheduling is non-reentrant.                //                // When there's no overlap with the OnNext(0L) call, the following code will cycle through                // pendingTickCount 0 -> 1 -> 0 for the remainder of the timer's execution.                //                // If there's overlap with the OnNext(0L) call, pendingTickCount will increase to record                // the number of catch up OnNext calls required, which will be dispatched by the recursive                // scheduling loop in CatchUp (which quits when it reaches 0 pending ticks).                //                if (Interlocked.Increment(ref _pendingTickCount) == 1)                {                    base._observer.OnNext(count);                    Interlocked.Decrement(ref _pendingTickCount);                }                return unchecked(count + 1);            }            private void CatchUp(long count, Action<long> recurse)            {                try                {                    base._observer.OnNext(count);                }                catch (Exception e)                {                    _periodic.Dispose();                    e.Throw();                }                //                // We can simply bail out if we decreased the tick count to 0. In that case, the Tock                // method will take over when it sees the 0 -> 1 transition.                //                if (Interlocked.Decrement(ref _pendingTickCount) > 0)                {                    recurse(unchecked(count + 1));                }            }        }    }}
 |