Browse Source

Allowing to use delayed tasks with a duration longer than int.MaxValue milliseconds.

Bart De Smet 10 years ago
parent
commit
39a3fa8056

+ 3 - 16
Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs

@@ -412,11 +412,7 @@ namespace System.Reactive.Concurrency
         public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime)
         {
             var cancel = new CancellationDisposable();            
-#if USE_TASKEX
-            TaskEx.Delay(dueTime, cancel.Token).ContinueWith(
-#else
-            Task.Delay(dueTime, cancel.Token).ContinueWith(
-#endif
+            TaskHelpers.Delay(dueTime, cancel.Token).ContinueWith(
                 _ => action(state),
                 TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion
             );
@@ -436,11 +432,7 @@ namespace System.Reactive.Concurrency
                 var moveNext = default(Action);
                 moveNext = () =>
                 {
-#if USE_TASKEX
-                TaskEx.Delay(period, cancel.Token).ContinueWith(
-#else
-                    Task.Delay(period, cancel.Token).ContinueWith(
-#endif
+                    TaskHelpers.Delay(period, cancel.Token).ContinueWith(
                         _ =>
                         {
                             moveNext();
@@ -465,12 +457,7 @@ namespace System.Reactive.Concurrency
         
         public void Sleep(TimeSpan timeout)
         {
-#if USE_TASKEX
-            TaskEx.Delay(timeout).Wait();
-#else
-            Task.Delay(timeout).Wait();
-#endif
-
+            TaskHelpers.Delay(timeout).Wait();
         }
 
         public IStopwatch StartStopwatch()

+ 44 - 0
Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/TaskHelpers.cs

@@ -0,0 +1,44 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_TPL && !NO_TASK_DELAY
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Reactive.Concurrency
+{
+    static class TaskHelpers
+    {
+        private const int MAX_DELAY = int.MaxValue;
+
+        public static Task Delay(TimeSpan delay)
+        {
+            return Delay(delay, CancellationToken.None);
+        }
+
+        public static Task Delay(TimeSpan delay, CancellationToken token)
+        {
+            var milliseconds = (long)delay.TotalMilliseconds;
+
+            if (milliseconds > MAX_DELAY)
+            {
+                var remainder = delay - TimeSpan.FromMilliseconds(MAX_DELAY);
+
+                return
+#if USE_TASKEX
+                    TaskEx.Delay(MAX_DELAY, token)
+#else
+                    Task.Delay(MAX_DELAY, token)
+#endif
+                        .ContinueWith(_ => Delay(remainder, token), TaskContinuationOptions.ExecuteSynchronously)
+                        .Unwrap();
+            }
+
+#if USE_TASKEX
+            return TaskEx.Delay(delay, token);
+#else
+            return Task.Delay(delay, token);
+#endif
+        }
+    }
+}
+#endif

+ 1 - 0
Rx.NET/Source/System.Reactive.Core/System.Reactive.Core.csproj

@@ -69,6 +69,7 @@
     <Compile Include="Reactive\Concurrency\Synchronization.Synchronize.cs" />
     <Compile Include="Reactive\Concurrency\SynchronizationContextScheduler.cs" />
     <Compile Include="Reactive\Concurrency\DefaultScheduler.cs" />
+    <Compile Include="Reactive\Concurrency\TaskHelpers.cs" />
     <Compile Include="Reactive\Disposables\StableCompositeDisposable.cs" />
     <Compile Include="Reactive\Internal\AsyncLockObserver.cs" />
     <Compile Include="Reactive\Internal\CheckedObserver.cs" />

+ 2 - 10
Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs

@@ -111,11 +111,7 @@ namespace System.Reactive.Concurrency
             var ct = new CancellationDisposable();
             d.Disposable = ct;
 
-#if USE_TASKEX
-            TaskEx.Delay(dueTime, ct.Token).ContinueWith(_ =>
-#else
-            Task.Delay(dueTime, ct.Token).ContinueWith(_ =>
-#endif
+            TaskHelpers.Delay(dueTime, ct.Token).ContinueWith(_ =>
             {
                 if (!d.IsDisposed)
                     d.Disposable = action(this, state);
@@ -194,11 +190,7 @@ namespace System.Reactive.Concurrency
             var moveNext = default(Action);
             moveNext = () =>
             {
-#if USE_TASKEX
-                TaskEx.Delay(period, cancel.Token).ContinueWith(
-#else
-                Task.Delay(period, cancel.Token).ContinueWith(
-#endif
+                TaskHelpers.Delay(period, cancel.Token).ContinueWith(
                     _ =>
                     {
                         moveNext();

+ 11 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Concurrency/TaskPoolSchedulerTest.cs

@@ -160,6 +160,17 @@ namespace ReactiveTests.Tests
 
             Assert.IsFalse(fail);
         }
+
+        [TestMethod]
+        public void TaskPool_Delay_LargerThanIntMaxValue()
+        {
+            var dueTime = TimeSpan.FromMilliseconds((double)int.MaxValue + 1);
+
+            // Just ensuring the call to Schedule does not throw.
+            var d = TaskPoolScheduler.Default.Schedule(dueTime, () => { });
+
+            d.Dispose();
+        }
     }
 }
 #endif

+ 1 - 1
Rx.NET/Source/Tests.System.Reactive/Tests/DefaultConcurrencyAbstractionLayerTest.cs

@@ -257,7 +257,7 @@ namespace ReactiveTests.Tests
 
                 if (!success)
                 {
-                    res.Value = "Failed null check ScheduleLongRunnign.";
+                    res.Value = "Failed null check ScheduleLongRunning.";
                     state.Value.Set();
                     return;
                 }