ソースを参照

Fixing a race in Join.

Bart De Smet 10 年 前
コミット
f35676fa35

+ 34 - 24
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs

@@ -103,9 +103,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 public void OnNext(TLeft value)
                 {
                     var id = 0;
+                    var rightID = 0;
                     lock (_parent._gate)
                     {
                         id = _parent._leftID++;
+                        rightID = _parent._rightID;
                         _parent._leftMap.Add(id, value);
                     }
 
@@ -128,21 +130,24 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     lock (_parent._gate)
                     {
-                        foreach (var rightValue in _parent._rightMap.Values)
+                        foreach (var rightValue in _parent._rightMap)
                         {
-                            var result = default(TResult);
-                            try
+                            if (rightValue.Key < rightID)
                             {
-                                result = _parent._parent._resultSelector(value, rightValue);
+                                var result = default(TResult);
+                                try
+                                {
+                                    result = _parent._parent._resultSelector(value, rightValue.Value);
+                                }
+                                catch (Exception exception)
+                                {
+                                    _parent._observer.OnError(exception);
+                                    _parent.Dispose();
+                                    return;
+                                }
+
+                                _parent._observer.OnNext(result);
                             }
-                            catch (Exception exception)
-                            {
-                                _parent._observer.OnError(exception);
-                                _parent.Dispose();
-                                return;
-                            }
-
-                            _parent._observer.OnNext(result);
                         }
                     }
                 }
@@ -231,9 +236,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 public void OnNext(TRight value)
                 {
                     var id = 0;
+                    var leftID = 0;
                     lock (_parent._gate)
                     {
                         id = _parent._rightID++;
+                        leftID = _parent._leftID;
                         _parent._rightMap.Add(id, value);
                     }
 
@@ -256,21 +263,24 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     lock (_parent._gate)
                     {
-                        foreach (var leftValue in _parent._leftMap.Values)
+                        foreach (var leftValue in _parent._leftMap)
                         {
-                            var result = default(TResult);
-                            try
+                            if (leftValue.Key < leftID)
                             {
-                                result = _parent._parent._resultSelector(leftValue, value);
+                                var result = default(TResult);
+                                try
+                                {
+                                    result = _parent._parent._resultSelector(leftValue.Value, value);
+                                }
+                                catch (Exception exception)
+                                {
+                                    _parent._observer.OnError(exception);
+                                    _parent.Dispose();
+                                    return;
+                                }
+
+                                _parent._observer.OnNext(result);
                             }
-                            catch (Exception exception)
-                            {
-                                _parent._observer.OnError(exception);
-                                _parent.Dispose();
-                                return;
-                            }
-
-                            _parent._observer.OnNext(result);
                         }
                     }
                 }