Browse Source

Making VirtualTimeScheduler thread-safe.

Bart De Smet 10 years ago
parent
commit
b43e9149ae

+ 20 - 8
Rx.NET/Source/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs

@@ -373,14 +373,18 @@ namespace System.Reactive.Concurrency
         /// <returns>The next scheduled item.</returns>
         protected override IScheduledItem<TAbsolute> GetNext()
         {
-            while (queue.Count > 0)
+            lock (queue)
             {
-                var next = queue.Peek();
-                if (next.IsCanceled)
-                    queue.Dequeue();
-                else
-                    return next;
+                while (queue.Count > 0)
+                {
+                    var next = queue.Peek();
+                    if (next.IsCanceled)
+                        queue.Dequeue();
+                    else
+                        return next;
+                }
             }
+
             return null;
         }
 
@@ -402,12 +406,20 @@ namespace System.Reactive.Concurrency
 
             var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) =>
             {
-                queue.Remove(si);
+                lock (queue)
+                {
+                    queue.Remove(si);
+                }
+
                 return action(scheduler, state1);
             });
 
             si = new ScheduledItem<TAbsolute, TState>(this, state, run, dueTime, Comparer);
-            queue.Enqueue(si);
+
+            lock (queue)
+            {
+                queue.Enqueue(si);
+            }
 
             return Disposable.Create(si.Cancel);
         }

+ 35 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Concurrency/VirtualSchedulerTest.cs

@@ -4,6 +4,7 @@ using System;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Reactive.Concurrency;
+using System.Reactive.Linq;
 using System.Threading;
 using Microsoft.Reactive.Testing;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
@@ -123,5 +124,39 @@ namespace ReactiveTests.Tests
             Assert.IsTrue(sw.ElapsedMilliseconds > 180, "due " + sw.ElapsedMilliseconds);
         }
 #endif
+
+#if DESKTOPCLR
+        [TestMethod]
+        public void Virtual_ThreadSafety()
+        {
+            for (var i = 0; i < 10; i++)
+            {
+                var scheduler = new TestScheduler();
+                var seq = Observable.Never<string>();
+
+                ThreadPool.QueueUserWorkItem(_ =>
+                {
+                    Thread.Sleep(50);
+                    seq.Timeout(TimeSpan.FromSeconds(10), scheduler).Subscribe(s => { });
+                });
+
+                var watch = scheduler.StartStopwatch();
+                try
+                {
+                    while (watch.Elapsed < TimeSpan.FromSeconds(20))
+                    {
+                        scheduler.AdvanceBy(10);
+                    }
+                }
+                catch (TimeoutException)
+                {
+                }
+                catch (Exception ex)
+                {
+                    Assert.Fail("Virtual time {0}, exception {1}", watch.Elapsed, ex);
+                }
+            }
+        }
+#endif
     }
 }