瀏覽代碼

Preventing race similar to Join in GroupJoin.

Bart De Smet 10 年之前
父節點
當前提交
7ca2e8a947
共有 1 個文件被更改,包括 32 次插入8 次删除
  1. 32 8
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs

+ 32 - 8
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs

@@ -88,8 +88,12 @@ namespace System.Reactive.Linq.ObservableImpl
                 private void Expire(int id, IObserver<TRight> group, IDisposable resource)
                 {
                     lock (_parent._gate)
+                    {
                         if (_parent._leftMap.Remove(id))
+                        {
                             group.OnCompleted();
+                        }
+                    }
 
                     _parent._group.Remove(resource);
                 }
@@ -98,9 +102,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     var s = new Subject<TRight>();
                     var id = 0;
+                    var rightID = 0;
                     lock (_parent._gate)
                     {
                         id = _parent._leftID++;
+                        rightID = _parent._rightID;
                         _parent._leftMap.Add(id, s);
                     }
 
@@ -139,9 +145,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     {
                         _parent._observer.OnNext(result);
 
-                        foreach (var rightValue in _parent._rightMap.Values)
+                        foreach (var rightValue in _parent._rightMap)
                         {
-                            s.OnNext(rightValue);
+                            if (rightValue.Key < rightID)
+                            {
+                                s.OnNext(rightValue.Value);
+                            }
                         }
                     }
                 }
@@ -181,8 +190,10 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     lock (_parent._gate)
                     {
-                        foreach (var o in _parent._leftMap.Values)
-                            o.OnError(error);
+                        foreach (var o in _parent._leftMap)
+                        {
+                            o.Value.OnError(error);
+                        }
 
                         _parent._observer.OnError(error);
                         _parent.Dispose();
@@ -215,7 +226,9 @@ namespace System.Reactive.Linq.ObservableImpl
                 private void Expire(int id, IDisposable resource)
                 {
                     lock (_parent._gate)
+                    {
                         _parent._rightMap.Remove(id);
+                    }
 
                     _parent._group.Remove(resource);
                 }
@@ -223,9 +236,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 public void OnNext(TRight value)
                 {
                     var id = 0;
+                    var leftID = 0;
                     lock (_parent._gate)
                     {
                         id = _parent._rightID++;
+                        leftID = _parent._leftID;
                         _parent._rightMap.Add(id, value);
                     }
 
@@ -242,12 +257,18 @@ namespace System.Reactive.Linq.ObservableImpl
                         OnError(exception);
                         return;
                     }
+
                     md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
 
                     lock (_parent._gate)
                     {
-                        foreach (var o in _parent._leftMap.Values)
-                            o.OnNext(value);
+                        foreach (var o in _parent._leftMap)
+                        {
+                            if (o.Key < leftID)
+                            {
+                                o.Value.OnNext(value);
+                            }
+                        }
                     }
                 }
 
@@ -284,8 +305,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     lock (_parent._gate)
                     {
-                        foreach (var o in _parent._leftMap.Values)
-                            o.OnError(error);
+                        foreach (var o in _parent._leftMap)
+                        {
+                            o.Value.OnError(error);
+                        }
+
                         _parent._observer.OnError(error);
                         _parent.Dispose();
                     }