Browse Source

Fixing deadlock in GroupByUntil.

Bart De Smet 10 years ago
parent
commit
3378bc3dbb

+ 12 - 8
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs

@@ -58,6 +58,13 @@ namespace System.Reactive.Linq.ObservableImpl
                 _nullGate = new object();
             }
 
+            private ISubject<TElement> NewSubject()
+            {
+                var sub = new Subject<TElement>();
+
+                return Subject.Create<TElement>(new AsyncLockObserver<TElement>(sub, new Concurrency.AsyncLock()), sub);
+            }
+
             public void OnNext(TSource value)
             {
                 var key = default(TKey);
@@ -89,7 +96,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         {
                             if (_null == null)
                             {
-                                _null = new Subject<TElement>();
+                                _null = NewSubject();
                                 fireNewMapEntry = true;
                             }
 
@@ -98,7 +105,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        writer = _map.GetOrAdd(key, () => new Subject<TElement>(), out fireNewMapEntry);
+                        writer = _map.GetOrAdd(key, NewSubject, out fireNewMapEntry);
                     }
                 }
                 catch (Exception exception)
@@ -161,8 +168,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 //        revisit the behavior for vNext. Nonetheless, we'll add synchronization
                 //        to ensure no concurrent calls to the subject are made.
                 //
-                lock (writer)
-                    writer.OnNext(element);
+                writer.OnNext(element);
             }
 
             class Delta : IObserver<TDuration>
@@ -202,15 +208,13 @@ namespace System.Reactive.Linq.ObservableImpl
                             _parent._null = null;
                         }
 
-                        lock (@null)
-                            @null.OnCompleted();
+                        @null.OnCompleted();
                     }
                     else
                     {
                         if (_parent._map.Remove(_key))
                         {
-                            lock (_writer)
-                                _writer.OnCompleted();
+                            _writer.OnCompleted();
                         }
                     }
 

+ 10 - 2
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs

@@ -4860,11 +4860,13 @@ namespace ReactiveTests.Tests
 
             res["baR"].Messages.AssertEqual(
                 OnNext(390, "rab   "),
+                OnNext(420, "  RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
                 OnCompleted<string>(420)
             );
 
             res["Baz"].Messages.AssertEqual(
                 OnNext(480, "  zab"),
+                OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
                 OnCompleted<string>(510)
             );
 
@@ -5036,11 +5038,13 @@ namespace ReactiveTests.Tests
 
             res["baR"].Messages.AssertEqual(
                 OnNext(390, "rab   "),
+                OnNext(420, "  RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
                 OnCompleted<string>(420)
-               );
+            );
 
             res["Baz"].Messages.AssertEqual(
                 OnNext(480, "  zab"),
+                OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
                 OnCompleted<string>(510)
             );
 
@@ -6727,11 +6731,13 @@ namespace ReactiveTests.Tests
 
             res["baR"].Messages.AssertEqual(
                 OnNext(390, "rab   "),
+                OnNext(420, "  RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
                 OnCompleted<string>(420)
             );
 
             res["Baz"].Messages.AssertEqual(
                 OnNext(480, "  zab"),
+                OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
                 OnCompleted<string>(510)
             );
 
@@ -6903,11 +6909,13 @@ namespace ReactiveTests.Tests
 
             res["baR"].Messages.AssertEqual(
                 OnNext(390, "rab   "),
+                OnNext(420, "  RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
                 OnCompleted<string>(420)
-               );
+            );
 
             res["Baz"].Messages.AssertEqual(
                 OnNext(480, "  zab"),
+                OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
                 OnCompleted<string>(510)
             );