Browse Source

Fixed a bug in EventLoopScheduler that may cause SemaphoreFullException to be thrown when under heavy load due to incongruent calls to Sempahore.Release and Semaphore.Wait.

davedev 12 years ago
parent
commit
ebd175d608

+ 10 - 0
Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs

@@ -295,6 +295,16 @@ namespace System.Reactive.Concurrency
 
                 lock (_gate)
                 {
+                    //
+                    // Bug fix that ensures the number of calls to Release never greatly exceeds the number of calls to Wait.
+                    // See work item #37: https://rx.codeplex.com/workitem/37
+                    //
+#if !NO_CDS
+                    while (_evt.CurrentCount > 0) _evt.Wait();
+#else
+                    while (_evt.WaitOne(TimeSpan.Zero)) { }
+#endif
+
                     //
                     // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the
                     // loop immediately. Subsequent calls to Schedule won't ever create a new thread.

+ 67 - 0
Rx.NET/Source/Tests.System.Reactive/Stress/Core/Schedulers/EventLoop.cs

@@ -0,0 +1,67 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if STRESS
+using System;
+using System.Linq;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Reflection;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+namespace ReactiveTests.Stress.Schedulers
+{
+    /// <summary>
+    /// Test for <see href="https://rx.codeplex.com/workitem/37">work item #37</see>.
+    /// </summary>
+    public static class EventLoop
+    {
+        private static readonly FieldInfo semaphore = typeof(EventLoopScheduler).GetField("_evt", BindingFlags.NonPublic | BindingFlags.Instance);
+
+        public static void NoSemaphoreFullException()
+        {
+            var failed = new TaskCompletionSource<int>();
+
+            using (var scheduler = new EventLoopScheduler())
+            {
+                Assert.AreEqual(0, scheduler.CurrentCount());
+
+                var maxCount = Environment.ProcessorCount;
+
+                using (Enumerable.Range(1, maxCount)
+                    .Select(_ => scheduler.SchedulePeriodic(TimeSpan.Zero, () =>
+                    {
+                        var count = scheduler.CurrentCount();
+
+                        if (count > maxCount)
+                            failed.SetResult(count);
+                    }))
+                    .Aggregate(
+                        new CompositeDisposable(),
+                        (c, d) =>
+                        {
+                            c.Add(d);
+                            return c;
+                        }))
+                {
+                    if (failed.Task.Wait(TimeSpan.FromSeconds(10)))
+                    {
+                        Assert.Fail("Semaphore count is too high: {0}", failed.Task.Result);
+                    }
+                }
+            }
+        }
+
+        private static int CurrentCount(this EventLoopScheduler scheduler)
+        {
+#if !NO_CDS
+            return ((SemaphoreSlim)semaphore.GetValue(scheduler)).CurrentCount;
+#else
+            return 0;
+#endif
+        }
+    }
+}
+#endif

+ 1 - 0
Rx.NET/Source/Tests.System.Reactive/Tests.System.Reactive.csproj

@@ -63,6 +63,7 @@
     <Compile Include="Semaphore.cs" />
     <Compile Include="Stress\Core\Disposables\Composite.cs" />
     <Compile Include="Stress\Core\Disposables\Serial.cs" />
+    <Compile Include="Stress\Core\Schedulers\EventLoop.cs" />
     <Compile Include="Stress\Core\Disposables\SingleAssignment.cs" />
     <Compile Include="Stress\Core\Disposables\RefCount.cs" />
     <Compile Include="Stress\Linq\Delay.cs" />

+ 11 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs

@@ -7,6 +7,9 @@ using System.Reactive.Concurrency;
 using System.Threading;
 using Microsoft.Reactive.Testing;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
+#if STRESS
+using ReactiveTests.Stress.Schedulers;
+#endif
 
 namespace ReactiveTests.Tests
 {
@@ -370,5 +373,13 @@ namespace ReactiveTests.Tests
                 d.Dispose();
             }
         }
+
+#if STRESS
+        [TestMethod]
+        public void EventLoop_Stress()
+        {
+            EventLoop.NoSemaphoreFullException();
+        }
+#endif
     }
 }