浏览代码

Fixed a bug that caused the Interval and Timer extensions and periodic scheduling in general to throw an ArgumentOutOfRangeException when period equaled TimeSpan.Zero. Includes corresponding unit tests.

davedev 12 年之前
父节点
当前提交
462115b64a

+ 94 - 25
Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs

@@ -22,16 +22,21 @@ namespace System.Reactive.Concurrency
 
         public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
         {
-            //
-            // MSDN documentation states the following:
-            //
-            //    "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once;
-            //     the periodic behavior of the timer is disabled, but can be re-enabled using the Change method."
-            //
-            if (period <= TimeSpan.Zero)
+            if (period < TimeSpan.Zero)
                 throw new ArgumentOutOfRangeException("period");
 
-            return new PeriodicTimer(action, period);
+            //
+            // The contract for periodic scheduling in Rx is that specifying TimeSpan.Zero as the period causes the scheduler to 
+            // call back periodically as fast as possible, sequentially.
+            //
+            if (period == TimeSpan.Zero)
+            {
+                return new FastPeriodicTimer(action);
+            }
+            else
+            {
+                return new PeriodicTimer(action, period);
+            }
         }
 
         public IDisposable QueueUserWorkItem(Action<object> action, object state)
@@ -362,6 +367,37 @@ namespace System.Reactive.Concurrency
             }
         }
 #endif
+
+        class FastPeriodicTimer : IDisposable
+        {
+            private readonly Action _action;
+            private bool disposed;
+
+            public FastPeriodicTimer(Action action)
+            {
+                _action = action;
+
+                new System.Threading.Thread(Loop)
+                {
+                    Name = "Rx-FastPeriodicTimer",
+                    IsBackground = true
+                }
+                .Start();
+            }
+
+            private void Loop()
+            {
+                while (!disposed)
+                {
+                    _action();
+                }
+            }
+
+            public void Dispose()
+            {
+                disposed = true;
+            }
+        }
     }
 }
 #else
@@ -389,28 +425,35 @@ namespace System.Reactive.Concurrency
 
         public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
         {
-            var cancel = new CancellationDisposable();
-
-            var moveNext = default(Action);
-            moveNext = () =>
+            if (period <= TimeSpan.Zero)
             {
+                return new FastPeriodicTimer(action);
+            }
+            else
+            {
+                var cancel = new CancellationDisposable();
+
+                var moveNext = default(Action);
+                moveNext = () =>
+                {
 #if USE_TASKEX
                 TaskEx.Delay(period, cancel.Token).ContinueWith(
 #else
-                Task.Delay(period, cancel.Token).ContinueWith(
+                    Task.Delay(period, cancel.Token).ContinueWith(
 #endif
-                    _ =>
-                    {
-                        moveNext();
-                        action();
-                    },
-                    TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion
-                );
-            };
-
-            moveNext();
-
-            return cancel;
+                        _ =>
+                        {
+                            moveNext();
+                            action();
+                        },
+                        TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion
+                    );
+                };
+
+                moveNext();
+
+                return cancel;
+            }
         }
 
         public IDisposable QueueUserWorkItem(Action<object> action, object state)
@@ -447,6 +490,32 @@ namespace System.Reactive.Concurrency
                 action(state);
             }, TaskCreationOptions.LongRunning);
         }
+
+        class FastPeriodicTimer : IDisposable
+        {
+            private readonly Action _action;
+            private bool disposed;
+
+            public FastPeriodicTimer(Action action)
+            {
+                _action = action;
+                
+                Task.Factory.StartNew(Loop, TaskCreationOptions.LongRunning);
+            }
+
+            private void Loop()
+            {
+                while (!disposed)
+                {
+                    _action();
+                }
+            }
+
+            public void Dispose()
+            {
+                disposed = true;
+            }
+        }
     }
 }
 #endif

+ 44 - 8
Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs

@@ -22,16 +22,21 @@ namespace System.Reactive.Concurrency
 
         public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
         {
-            //
-            // MSDN documentation states the following:
-            //
-            //    "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once;
-            //     the periodic behavior of the timer is disabled, but can be re-enabled using the Change method."
-            //
-            if (period <= TimeSpan.Zero)
+            if (period < TimeSpan.Zero)
                 throw new ArgumentOutOfRangeException("period");
 
-            return new PeriodicTimer(action, period);
+            //
+            // The contract for periodic scheduling in Rx is that specifying TimeSpan.Zero as the period causes the scheduler to 
+            // call back periodically as fast as possible, sequentially.
+            //
+            if (period == TimeSpan.Zero)
+            {
+                return new FastPeriodicTimer(action);
+            }
+            else
+            {
+                return new PeriodicTimer(action, period);
+            }
         }
 
         public IDisposable QueueUserWorkItem(Action<object> action, object state)
@@ -366,6 +371,37 @@ namespace System.Reactive.Concurrency
             }
         }
 #endif
+
+        class FastPeriodicTimer : IDisposable
+        {
+            private readonly Action _action;
+            private bool disposed;
+
+            public FastPeriodicTimer(Action action)
+            {
+                _action = action;
+                
+                new System.Threading.Thread(Loop)
+                {
+                    Name = "Rx-FastPeriodicTimer",
+                    IsBackground = true
+                }
+                .Start();
+            }
+            
+            private void Loop()
+            {
+                while (!disposed)
+                {
+                    _action();
+                }
+            }
+
+            public void Dispose()
+            {
+                disposed = true;
+            }
+        }
     }
 }
 #endif

+ 78 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/DefaultConcurrencyAbstractionLayerTest.cs

@@ -146,6 +146,69 @@ namespace ReactiveTests.Tests
             });
         }
 
+        [TestMethod]
+        public void StartPeriodicTimer_Fast()
+        {
+            var e = new MarshalByRefCell<ManualResetEvent> { Value = new ManualResetEvent(false) };
+            _domain.SetData("state", e);
+
+            Run(() =>
+            {
+                var n = 0;
+
+                Scheduler.Default.SchedulePeriodic(TimeSpan.Zero, () =>
+                {
+                    var state = (MarshalByRefCell<ManualResetEvent>)_domain.GetData("state");
+
+                    if (n++ == 10)
+                        state.Value.Set();
+                });
+            });
+
+            e.Value.WaitOne();
+        }
+
+        [TestMethod]
+        public void StartPeriodicTimer_Fast_Cancel()
+        {
+            var e = new MarshalByRefCell<ManualResetEvent> { Value = new ManualResetEvent(false) };
+            _domain.SetData("set_cancel", e);
+
+            Run(() =>
+            {
+                var n = 0;
+                
+                var schedule = Scheduler.Default.SchedulePeriodic(TimeSpan.Zero, () =>
+                {
+                    _domain.SetData("value", n++);
+                });
+
+                _domain.SetData("cancel", new MarshalByRefAction(schedule.Dispose));
+
+                var setCancel = (MarshalByRefCell<ManualResetEvent>)_domain.GetData("set_cancel");
+                setCancel.Value.Set();
+            });
+
+            e.Value.WaitOne();
+
+            var value = (int)_domain.GetData("value");
+
+            var cancel = (MarshalByRefAction)_domain.GetData("cancel");
+            cancel.Invoke();
+            
+            Thread.Sleep(TimeSpan.FromMilliseconds(50));
+            
+            var newValue = (int)_domain.GetData("value");
+            
+            Assert.IsTrue(newValue >= value);
+
+            Thread.Sleep(TimeSpan.FromMilliseconds(50));
+
+            value = (int)_domain.GetData("value");
+            
+            Assert.AreEqual(newValue, value);
+        }
+
         [TestMethod]
         public void CreateThread()
         {
@@ -287,5 +350,20 @@ namespace ReactiveTests.Tests
     {
         public T Value;
     }
+
+    public class MarshalByRefAction : MarshalByRefObject
+    {
+        private readonly Action _action;
+
+        public MarshalByRefAction(Action action)
+        {
+            _action = action;
+        }
+
+        public void Invoke()
+        {
+            _action();
+        }
+    }
 }
 #endif

+ 42 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableTimeTest.cs

@@ -2709,6 +2709,20 @@ namespace ReactiveTests.Tests
             );
         }
 
+        [TestMethod]
+        public void Interval_TimeSpan_Zero_DefaultScheduler()
+        {
+            var scheduler = new TestScheduler();
+            var observer = scheduler.CreateObserver<long>();
+            var completed = new ManualResetEventSlim();
+
+            Observable.Interval(TimeSpan.Zero).TakeWhile(i => i < 10).Subscribe(observer.OnNext, completed.Set);
+
+            completed.Wait();
+            
+            Assert.AreEqual(10, observer.Messages.Count);
+        }
+
         [TestMethod]
         public void Interval_TimeSpan_Disposed()
         {
@@ -7446,6 +7460,20 @@ namespace ReactiveTests.Tests
             );
         }
 
+        [TestMethod]
+        public void OneShotTimer_TimeSpan_Zero_DefaultScheduler()
+        {
+            var scheduler = new TestScheduler();
+            var observer = scheduler.CreateObserver<long>();
+            var completed = new ManualResetEventSlim();
+
+            Observable.Timer(TimeSpan.Zero).Subscribe(observer.OnNext, completed.Set);
+
+            completed.Wait();
+            
+            Assert.AreEqual(1, observer.Messages.Count);
+        }
+
         [TestMethod]
         public void OneShotTimer_TimeSpan_Negative()
         {
@@ -7563,6 +7591,20 @@ namespace ReactiveTests.Tests
             );
         }
 
+        [TestMethod]
+        public void RepeatingTimer_TimeSpan_Zero_DefaultScheduler()
+        {
+            var scheduler = new TestScheduler();
+            var observer = scheduler.CreateObserver<long>();
+            var completed = new ManualResetEventSlim();
+
+            Observable.Timer(TimeSpan.Zero, TimeSpan.Zero).TakeWhile(i => i < 10).Subscribe(observer.OnNext, completed.Set);
+
+            completed.Wait();
+
+            Assert.AreEqual(10, observer.Messages.Count);
+        }
+
         [TestMethod]
         public void RepeatingTimer_DateTimeOffset_TimeSpan_Simple()
         {