瀏覽代碼

Fix for a race condition in blocking operators.

Bart De Smet 10 年之前
父節點
當前提交
9f37553b7d

+ 34 - 5
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Blocking.cs

@@ -135,7 +135,7 @@ namespace System.Reactive.Linq
             var seenValue = false;
             var ex = default(Exception);
 
-            using (var evt = new ManualResetEvent(false))
+            using (var evt = new WaitAndSetOnce())
             {
                 //
                 // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
@@ -179,7 +179,7 @@ namespace System.Reactive.Linq
         public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource> onNext)
         {
 #if !NO_PERF
-            using (var evt = new ManualResetEvent(false))
+            using (var evt = new WaitAndSetOnce())
             {
                 var sink = new ForEach<TSource>._(onNext, () => evt.Set());
 
@@ -198,7 +198,7 @@ namespace System.Reactive.Linq
         public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource, int> onNext)
         {
 #if !NO_PERF
-            using (var evt = new ManualResetEvent(false))
+            using (var evt = new WaitAndSetOnce())
             {
                 var sink = new ForEach<TSource>.ForEachImpl(onNext, () => evt.Set());
 
@@ -315,7 +315,7 @@ namespace System.Reactive.Linq
             var seenValue = false;
             var ex = default(Exception);
 
-            using (var evt = new ManualResetEvent(false))
+            using (var evt = new WaitAndSetOnce())
             {
                 //
                 // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
@@ -409,7 +409,7 @@ namespace System.Reactive.Linq
             var seenValue = false;
             var ex = default(Exception);
 
-            using (var evt = new ManualResetEvent(false))
+            using (var evt = new WaitAndSetOnce())
             {
                 //
                 // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
@@ -471,6 +471,35 @@ namespace System.Reactive.Linq
         }
 #endif
 
+        class WaitAndSetOnce : IDisposable
+        {
+            private readonly ManualResetEvent _evt;
+            private int _hasSet;
+
+            public WaitAndSetOnce()
+            {
+                _evt = new ManualResetEvent(false);
+            }
+
+            public void Set()
+            {
+                if (Interlocked.Exchange(ref _hasSet, 1) == 0)
+                {
+                    _evt.Set();
+                }
+            }
+
+            public void WaitOne()
+            {
+                _evt.WaitOne();
+            }
+
+            public void Dispose()
+            {
+                _evt.Dispose();
+            }
+        }
+
         #endregion
     }
 }

+ 61 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableBlockingTest.cs

@@ -592,6 +592,67 @@ namespace ReactiveTests.Tests
             Assert.AreEqual(value, Observable.Range(value, 10).FirstOrDefault());
         }
 
+        [TestMethod]
+        public void FirstOrDefault_NoDoubleSet()
+        {
+            //
+            // Regression test for a possible race condition caused by Return style operators
+            // that could trigger two Set calls on a ManualResetEvent, causing it to get
+            // disposed in between those two calls (cf. FirstOrDefaultInternal). This led
+            // to an exception will the following stack trace:
+            //
+            //    System.ObjectDisposedException: Safe handle has been closed
+            //       at System.Runtime.InteropServices.SafeHandle.DangerousAddRef(Boolean& success)
+            //       at System.StubHelpers.StubHelpers.SafeHandleAddRef(SafeHandle pHandle, Boolean& success)
+            //       at Microsoft.Win32.Win32Native.SetEvent(SafeWaitHandle handle)
+            //       at System.Threading.EventWaitHandle.Set()
+            //       at System.Reactive.Linq.QueryLanguage.<>c__DisplayClass458_1`1.<FirstOrDefaultInternal>b__2()
+            //
+
+            var o = new O();
+
+            Scheduler.Default.Schedule(() =>
+            {
+                var x = o.FirstOrDefault();
+            });
+
+            o.Wait();
+
+            o.Next();
+
+            Thread.Sleep(100); // enough time to let the ManualResetEvent dispose
+
+            o.Done();
+        }
+
+        class O : IObservable<int>
+        {
+            private readonly ManualResetEvent _event = new ManualResetEvent(false);
+            private IObserver<int> _observer;
+
+            public void Wait()
+            {
+                _event.WaitOne();
+            }
+
+            public void Next()
+            {
+                _observer.OnNext(42);
+            }
+
+            public void Done()
+            {
+                _observer.OnCompleted();
+            }
+
+            public IDisposable Subscribe(IObserver<int> observer)
+            {
+                _observer = observer;
+                _event.Set();
+                return Disposable.Empty;
+            }
+        }
+
         #endregion
 
         #region + ForEach +