Browse Source

Improve Timer() scheduler handling (#700)

David Karnok 7 years ago
parent
commit
837e65ba63
1 changed files with 19 additions and 14 deletions
  1. 19 14
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs

+ 19 - 14
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs

@@ -120,6 +120,8 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly TimeSpan _period;
 
+                long _index;
+
                 public _(TimeSpan period, IObserver<long> observer)
                     : base(observer)
                 {
@@ -128,7 +130,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public void Run(Periodic parent, DateTimeOffset dueTime)
                 {
-                    SetUpstream(parent._scheduler.Schedule(default(object), dueTime, InvokeStart));
+                    SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
                 }
 
                 public void Run(Periodic parent, TimeSpan dueTime)
@@ -137,9 +139,9 @@ namespace System.Reactive.Linq.ObservableImpl
                     // Optimize for the case of Observable.Interval.
                     //
                     if (dueTime == _period)
-                        SetUpstream(parent._scheduler.SchedulePeriodic(0L, _period, (Func<long, long>)Tick));
+                        SetUpstream(parent._scheduler.SchedulePeriodic(this, _period, @this => @this.Tick()));
                     else
-                        SetUpstream(parent._scheduler.Schedule(default(object), dueTime, InvokeStart));
+                        SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
                 }
 
                 //
@@ -156,16 +158,18 @@ namespace System.Reactive.Linq.ObservableImpl
                 //   selectors. When the system clock changes, intervals will not be the same as diffs between
                 //   consecutive absolute time values. The precision will be low (1s range by default).
                 //
-                private long Tick(long count)
+                private void Tick()
                 {
+                    var count = _index;
+                    _index = unchecked(count + 1);
+
                     ForwardOnNext(count);
-                    return unchecked(count + 1);
                 }
 
                 private int _pendingTickCount;
                 private IDisposable _periodic;
 
-                private IDisposable InvokeStart(IScheduler self, object state)
+                private IDisposable InvokeStart(IScheduler self)
                 {
                     //
                     // Notice the first call to OnNext will introduce skew if it takes significantly long when
@@ -218,7 +222,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     var d = new SingleAssignmentDisposable();
                     _periodic = d;
-                    d.Disposable = self.SchedulePeriodic(1L, _period, (Func<long, long>)Tock);
+                    _index = 1;
+                    d.Disposable = self.SchedulePeriodic(this, _period, @this => @this.Tock());
 
                     try
                     {
@@ -238,8 +243,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     //
                     if (Interlocked.Decrement(ref _pendingTickCount) > 0)
                     {
-                        var c = new SingleAssignmentDisposable();
-                        c.Disposable = self.Schedule(1L, CatchUp);
+                        var c = self.Schedule((@this: this, index: 1L), (tuple, action) => [email protected](tuple.index, action));
 
                         return StableCompositeDisposable.Create(d, c);
                     }
@@ -247,7 +251,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     return d;
                 }
 
-                private long Tock(long count)
+                private void Tock()
                 {
                     //
                     // Notice the handler for (emulated) periodic scheduling is non-reentrant.
@@ -261,14 +265,15 @@ namespace System.Reactive.Linq.ObservableImpl
                     //
                     if (Interlocked.Increment(ref _pendingTickCount) == 1)
                     {
+                        var count = _index;
+                        _index = unchecked(count + 1);
+
                         ForwardOnNext(count);
                         Interlocked.Decrement(ref _pendingTickCount);
                     }
-
-                    return unchecked(count + 1);
                 }
 
-                private void CatchUp(long count, Action<long> recurse)
+                private void CatchUp(long count, Action<(_, long)> recurse)
                 {
                     try
                     {
@@ -286,7 +291,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     //
                     if (Interlocked.Decrement(ref _pendingTickCount) > 0)
                     {
-                        recurse(unchecked(count + 1));
+                        recurse((this, unchecked(count + 1)));
                     }
                 }
             }