1
0
Эх сурвалжийг харах

Optimizing layouts of Timer variants.

Bart De Smet 8 жил өмнө
parent
commit
466c01bef8

+ 231 - 184
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs

@@ -8,253 +8,300 @@ using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Timer : Producer<long>
+    internal static class Timer
     {
-        private readonly DateTimeOffset? _dueTimeA;
-        private readonly TimeSpan? _dueTimeR;
-        private readonly TimeSpan? _period;
-        private readonly IScheduler _scheduler;
-
-        public Timer(DateTimeOffset dueTime, TimeSpan? period, IScheduler scheduler)
-        {
-            _dueTimeA = dueTime;
-            _period = period;
-            _scheduler = scheduler;
-        }
-
-        public Timer(TimeSpan dueTime, TimeSpan? period, IScheduler scheduler)
+        internal abstract class Single : Producer<long>
         {
-            _dueTimeR = dueTime;
-            _period = period;
-            _scheduler = scheduler;
-        }
+            private readonly IScheduler _scheduler;
 
-        protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_period.HasValue)
+            public Single(IScheduler scheduler)
             {
-                var sink = new TimerImpl(this, observer, cancel);
-                setSink(sink);
-                return sink.Run();
+                _scheduler = scheduler;
             }
-            else
+
+            internal sealed class Relative : Single
             {
-                var sink = new _(this, observer, cancel);
-                setSink(sink);
-                return sink.Run();
-            }
-        }
+                private readonly TimeSpan _dueTime;
 
-        class _ : Sink<long>
-        {
-            private readonly Timer _parent;
+                public Relative(TimeSpan dueTime, IScheduler scheduler)
+                    : base(scheduler)
+                {
+                    _dueTime = dueTime;
+                }
 
-            public _(Timer parent, IObserver<long> observer, IDisposable cancel)
-                : base(observer, cancel)
-            {
-                _parent = parent;
+                protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
+                {
+                    var sink = new _(observer, cancel);
+                    setSink(sink);
+                    return sink.Run(this, _dueTime);
+                }
             }
 
-            public IDisposable Run()
+            internal sealed class Absolute : Single
             {
-                if (_parent._dueTimeA.HasValue)
+                private readonly DateTimeOffset _dueTime;
+
+                public Absolute(DateTimeOffset dueTime, IScheduler scheduler)
+                    : base(scheduler)
                 {
-                    return _parent._scheduler.Schedule(_parent._dueTimeA.Value, Invoke);
+                    _dueTime = dueTime;
                 }
-                else
+
+                protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
                 {
-                    return _parent._scheduler.Schedule(_parent._dueTimeR.Value, Invoke);
+                    var sink = new _(observer, cancel);
+                    setSink(sink);
+                    return sink.Run(this, _dueTime);
                 }
             }
 
-            private void Invoke()
+            private sealed class _ : Sink<long>
             {
-                base._observer.OnNext(0);
-                base._observer.OnCompleted();
-                base.Dispose();
+                public _(IObserver<long> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                }
+
+                public IDisposable Run(Single parent, DateTimeOffset dueTime)
+                {
+                    return parent._scheduler.Schedule(dueTime, Invoke);
+                }
+
+                public IDisposable Run(Single parent, TimeSpan dueTime)
+                {
+                    return parent._scheduler.Schedule(dueTime, Invoke);
+                }
+
+                private void Invoke()
+                {
+                    base._observer.OnNext(0);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
 
-        class TimerImpl : Sink<long>
+        internal abstract class Periodic : Producer<long>
         {
-            private readonly Timer _parent;
             private readonly TimeSpan _period;
+            private readonly IScheduler _scheduler;
 
-            public TimerImpl(Timer parent, IObserver<long> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Periodic(TimeSpan period, IScheduler scheduler)
             {
-                _parent = parent;
-                _period = _parent._period.Value;
+                _period = period;
+                _scheduler = scheduler;
             }
 
-            public IDisposable Run()
+            internal sealed class Relative : Periodic
             {
-                if (_parent._dueTimeA.HasValue)
+                private readonly TimeSpan _dueTime;
+
+                public Relative(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
+                    : base(period, scheduler)
                 {
-                    var dueTime = _parent._dueTimeA.Value;
-                    return _parent._scheduler.Schedule(default(object), dueTime, InvokeStart);
+                    _dueTime = dueTime;
                 }
-                else
-                {
-                    var dueTime = _parent._dueTimeR.Value;
 
-                    //
-                    // Optimize for the case of Observable.Interval.
-                    //
-                    if (dueTime == _period)
-                    {
-                        return _parent._scheduler.SchedulePeriodic(0L, _period, (Func<long, long>)Tick);
-                    }
-
-                    return _parent._scheduler.Schedule(default(object), dueTime, InvokeStart);
+                protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
+                {
+                    var sink = new _(_period, observer, cancel);
+                    setSink(sink);
+                    return sink.Run(this, _dueTime);
                 }
             }
 
-            //
-            // BREAKING CHANGE v2 > v1.x - No more correction for time drift based on absolute time. This
-            //                             didn't work for large period values anyway; the fractional
-            //                             error exceeded corrections. Also complicated dealing with system
-            //                             clock change conditions and caused numerous bugs.
-            //
-            // - For more precise scheduling, use a custom scheduler that measures TimeSpan values in a
-            //   better way, e.g. spinning to make up for the last part of the period. Whether or not the
-            //   values of the TimeSpan period match NT time or wall clock time is up to the scheduler.
-            //
-            // - For more accurate scheduling wrt the system clock, use Generate with DateTimeOffset time
-            //   selectors. When the system clock changes, intervals will not be the same as diffs between
-            //   consecutive absolute time values. The precision will be low (1s range by default).
-            //
-            private long Tick(long count)
+            internal sealed class Absolute : Periodic
             {
-                base._observer.OnNext(count);
-                return unchecked(count + 1);
-            }
-
-            private int _pendingTickCount;
-            private IDisposable _periodic;
+                private readonly DateTimeOffset _dueTime;
 
-            private IDisposable InvokeStart(IScheduler self, object state)
-            {
-                //
-                // Notice the first call to OnNext will introduce skew if it takes significantly long when
-                // using the following naive implementation:
-                //
-                //    Code:  base._observer.OnNext(0L);
-                //           return self.SchedulePeriodicEmulated(1L, _period, (Func<long, long>)Tick);
-                //
-                // What we're saying here is that Observable.Timer(dueTime, period) is pretty much the same
-                // as writing Observable.Timer(dueTime).Concat(Observable.Interval(period)).
-                //
-                //    Expected:  dueTime
-                //                  |
-                //                  0--period--1--period--2--period--3--period--4--...
-                //                  |
-                //                  +-OnNext(0L)-|
-                //    
-                //    Actual:    dueTime
-                //                  |
-                //                  0------------#--period--1--period--2--period--3--period--4--...
-                //                  |
-                //                  +-OnNext(0L)-|
-                //
-                // Different solutions for this behavior have different problems:
-                //
-                // 1. Scheduling the periodic job first and using an AsyncLock to serialize the OnNext calls
-                //    has the drawback that InvokeStart may never return. This happens when every callback
-                //    doesn't meet the period's deadline, hence the periodic job keeps queueing stuff up. In
-                //    this case, InvokeStart stays the owner of the AsyncLock and the call to Wait will never
-                //    return, thus not allowing any interleaving of work on this scheduler's logical thread.
-                //
-                // 2. Scheduling the periodic job first and using a (blocking) synchronization primitive to
-                //    signal completion of the OnNext(0L) call to the Tick call requires quite a bit of state
-                //    and careful handling of the case when OnNext(0L) throws. What's worse is the blocking
-                //    behavior inside Tick.
-                //
-                // In order to avoid blocking behavior, we need a scheme much like SchedulePeriodic emulation
-                // where work to dispatch OnNext(n + 1) is delegated to a catch up loop in case OnNext(n) was
-                // still running. Because SchedulePeriodic emulation exhibits such behavior in all cases, we
-                // only need to deal with the overlap of OnNext(0L) with future periodic OnNext(n) dispatch
-                // jobs. In the worst case where every callback takes longer than the deadline implied by the
-                // period, the periodic job will just queue up work that's dispatched by the tail-recursive
-                // catch up loop. In the best case, all work will be dispatched on the periodic scheduler.
-                //
+                public Absolute(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)
+                    : base(period, scheduler)
+                {
+                    _dueTime = dueTime;
+                }
 
-                //
-                // We start with one tick pending because we're about to start doing OnNext(0L).
-                //
-                _pendingTickCount = 1;
+                protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
+                {
+                    var sink = new _(_period, observer, cancel);
+                    setSink(sink);
+                    return sink.Run(this, _dueTime);
+                }
+            }
 
-                var d = new SingleAssignmentDisposable();
-                _periodic = d;
-                d.Disposable = self.SchedulePeriodic(1L, _period, (Func<long, long>)Tock);
+            private sealed class _ : Sink<long>
+            {
+                private readonly TimeSpan _period;
 
-                try
+                public _(TimeSpan period, IObserver<long> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    base._observer.OnNext(0L);
+                    _period = period;
                 }
-                catch (Exception e)
+
+                public IDisposable Run(Periodic parent, DateTimeOffset dueTime)
                 {
-                    d.Dispose();
-                    e.Throw();
+                    return parent._scheduler.Schedule(default(object), dueTime, InvokeStart);
                 }
 
-                //
-                // If the periodic scheduling job already ran before we finished dispatching the OnNext(0L)
-                // call, we'll find pendingTickCount to be > 1. In this case, we need to catch up by dispatching
-                // subsequent calls to OnNext as fast as possible, but without running a loop in order to ensure
-                // fair play with the scheduler. So, we run a tail-recursive loop in CatchUp instead.
-                //
-                if (Interlocked.Decrement(ref _pendingTickCount) > 0)
+                public IDisposable Run(Periodic parent, TimeSpan dueTime)
                 {
-                    var c = new SingleAssignmentDisposable();
-                    c.Disposable = self.Schedule(1L, CatchUp);
+                    //
+                    // Optimize for the case of Observable.Interval.
+                    //
+                    if (dueTime == _period)
+                    {
+                        return parent._scheduler.SchedulePeriodic(0L, _period, (Func<long, long>)Tick);
+                    }
 
-                    return StableCompositeDisposable.Create(d, c);
+                    return parent._scheduler.Schedule(default(object), dueTime, InvokeStart);
                 }
 
-                return d;
-            }
-
-            private long Tock(long count)
-            {
                 //
-                // Notice the handler for (emulated) periodic scheduling is non-reentrant.
+                // BREAKING CHANGE v2 > v1.x - No more correction for time drift based on absolute time. This
+                //                             didn't work for large period values anyway; the fractional
+                //                             error exceeded corrections. Also complicated dealing with system
+                //                             clock change conditions and caused numerous bugs.
                 //
-                // When there's no overlap with the OnNext(0L) call, the following code will cycle through
-                // pendingTickCount 0 -> 1 -> 0 for the remainder of the timer's execution.
+                // - For more precise scheduling, use a custom scheduler that measures TimeSpan values in a
+                //   better way, e.g. spinning to make up for the last part of the period. Whether or not the
+                //   values of the TimeSpan period match NT time or wall clock time is up to the scheduler.
                 //
-                // If there's overlap with the OnNext(0L) call, pendingTickCount will increase to record
-                // the number of catch up OnNext calls required, which will be dispatched by the recursive
-                // scheduling loop in CatchUp (which quits when it reaches 0 pending ticks).
+                // - For more accurate scheduling wrt the system clock, use Generate with DateTimeOffset time
+                //   selectors. When the system clock changes, intervals will not be the same as diffs between
+                //   consecutive absolute time values. The precision will be low (1s range by default).
                 //
-                if (Interlocked.Increment(ref _pendingTickCount) == 1)
+                private long Tick(long count)
                 {
                     base._observer.OnNext(count);
-                    Interlocked.Decrement(ref _pendingTickCount);
+                    return unchecked(count + 1);
                 }
 
-                return unchecked(count + 1);
-            }
+                private int _pendingTickCount;
+                private IDisposable _periodic;
 
-            private void CatchUp(long count, Action<long> recurse)
-            {
-                try
+                private IDisposable InvokeStart(IScheduler self, object state)
                 {
-                    base._observer.OnNext(count);
+                    //
+                    // Notice the first call to OnNext will introduce skew if it takes significantly long when
+                    // using the following naive implementation:
+                    //
+                    //    Code:  base._observer.OnNext(0L);
+                    //           return self.SchedulePeriodicEmulated(1L, _period, (Func<long, long>)Tick);
+                    //
+                    // What we're saying here is that Observable.Timer(dueTime, period) is pretty much the same
+                    // as writing Observable.Timer(dueTime).Concat(Observable.Interval(period)).
+                    //
+                    //    Expected:  dueTime
+                    //                  |
+                    //                  0--period--1--period--2--period--3--period--4--...
+                    //                  |
+                    //                  +-OnNext(0L)-|
+                    //    
+                    //    Actual:    dueTime
+                    //                  |
+                    //                  0------------#--period--1--period--2--period--3--period--4--...
+                    //                  |
+                    //                  +-OnNext(0L)-|
+                    //
+                    // Different solutions for this behavior have different problems:
+                    //
+                    // 1. Scheduling the periodic job first and using an AsyncLock to serialize the OnNext calls
+                    //    has the drawback that InvokeStart may never return. This happens when every callback
+                    //    doesn't meet the period's deadline, hence the periodic job keeps queueing stuff up. In
+                    //    this case, InvokeStart stays the owner of the AsyncLock and the call to Wait will never
+                    //    return, thus not allowing any interleaving of work on this scheduler's logical thread.
+                    //
+                    // 2. Scheduling the periodic job first and using a (blocking) synchronization primitive to
+                    //    signal completion of the OnNext(0L) call to the Tick call requires quite a bit of state
+                    //    and careful handling of the case when OnNext(0L) throws. What's worse is the blocking
+                    //    behavior inside Tick.
+                    //
+                    // In order to avoid blocking behavior, we need a scheme much like SchedulePeriodic emulation
+                    // where work to dispatch OnNext(n + 1) is delegated to a catch up loop in case OnNext(n) was
+                    // still running. Because SchedulePeriodic emulation exhibits such behavior in all cases, we
+                    // only need to deal with the overlap of OnNext(0L) with future periodic OnNext(n) dispatch
+                    // jobs. In the worst case where every callback takes longer than the deadline implied by the
+                    // period, the periodic job will just queue up work that's dispatched by the tail-recursive
+                    // catch up loop. In the best case, all work will be dispatched on the periodic scheduler.
+                    //
+
+                    //
+                    // We start with one tick pending because we're about to start doing OnNext(0L).
+                    //
+                    _pendingTickCount = 1;
+
+                    var d = new SingleAssignmentDisposable();
+                    _periodic = d;
+                    d.Disposable = self.SchedulePeriodic(1L, _period, (Func<long, long>)Tock);
+
+                    try
+                    {
+                        base._observer.OnNext(0L);
+                    }
+                    catch (Exception e)
+                    {
+                        d.Dispose();
+                        e.Throw();
+                    }
+
+                    //
+                    // If the periodic scheduling job already ran before we finished dispatching the OnNext(0L)
+                    // call, we'll find pendingTickCount to be > 1. In this case, we need to catch up by dispatching
+                    // subsequent calls to OnNext as fast as possible, but without running a loop in order to ensure
+                    // fair play with the scheduler. So, we run a tail-recursive loop in CatchUp instead.
+                    //
+                    if (Interlocked.Decrement(ref _pendingTickCount) > 0)
+                    {
+                        var c = new SingleAssignmentDisposable();
+                        c.Disposable = self.Schedule(1L, CatchUp);
+
+                        return StableCompositeDisposable.Create(d, c);
+                    }
+
+                    return d;
                 }
-                catch (Exception e)
+
+                private long Tock(long count)
                 {
-                    _periodic.Dispose();
-                    e.Throw();
+                    //
+                    // Notice the handler for (emulated) periodic scheduling is non-reentrant.
+                    //
+                    // When there's no overlap with the OnNext(0L) call, the following code will cycle through
+                    // pendingTickCount 0 -> 1 -> 0 for the remainder of the timer's execution.
+                    //
+                    // If there's overlap with the OnNext(0L) call, pendingTickCount will increase to record
+                    // the number of catch up OnNext calls required, which will be dispatched by the recursive
+                    // scheduling loop in CatchUp (which quits when it reaches 0 pending ticks).
+                    //
+                    if (Interlocked.Increment(ref _pendingTickCount) == 1)
+                    {
+                        base._observer.OnNext(count);
+                        Interlocked.Decrement(ref _pendingTickCount);
+                    }
+
+                    return unchecked(count + 1);
                 }
 
-                //
-                // We can simply bail out if we decreased the tick count to 0. In that case, the Tock
-                // method will take over when it sees the 0 -> 1 transition.
-                //
-                if (Interlocked.Decrement(ref _pendingTickCount) > 0)
+                private void CatchUp(long count, Action<long> recurse)
                 {
-                    recurse(unchecked(count + 1));
+                    try
+                    {
+                        base._observer.OnNext(count);
+                    }
+                    catch (Exception e)
+                    {
+                        _periodic.Dispose();
+                        e.Throw();
+                    }
+
+                    //
+                    // We can simply bail out if we decreased the tick count to 0. In that case, the Tock
+                    // method will take over when it sees the 0 -> 1 transition.
+                    //
+                    if (Interlocked.Decrement(ref _pendingTickCount) > 0)
+                    {
+                        recurse(unchecked(count + 1));
+                    }
                 }
             }
         }

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Time.cs

@@ -565,22 +565,22 @@ namespace System.Reactive.Linq
 
         private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler)
         {
-            return new Timer(dueTime, null, scheduler);
+            return new Timer.Single.Relative(dueTime, scheduler);
         }
 
         private static IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
         {
-            return new Timer(dueTime, period, scheduler);
+            return new Timer.Periodic.Relative(dueTime, period, scheduler);
         }
 
         private static IObservable<long> Timer_(DateTimeOffset dueTime, IScheduler scheduler)
         {
-            return new Timer(dueTime, null, scheduler);
+            return new Timer.Single.Absolute(dueTime, scheduler);
         }
 
         private static IObservable<long> Timer_(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)
         {
-            return new Timer(dueTime, period, scheduler);
+            return new Timer.Periodic.Absolute(dueTime, period, scheduler);
         }
 
         #endregion