瀏覽代碼

Optimizing layouts of GroupJoin and Join.

Bart De Smet 8 年之前
父節點
當前提交
1881892886

+ 32 - 31
Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupJoin.cs

@@ -29,52 +29,53 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             var sink = new _(this, observer, cancel);
             setSink(sink);
-            return sink.Run();
+            return sink.Run(this);
         }
 
-        class _ : Sink<TResult>
+        private sealed class _ : Sink<TResult>
         {
-            private readonly GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> _parent;
+            private readonly object _gate = new object();
+            private readonly CompositeDisposable _group = new CompositeDisposable();
+            private readonly RefCountDisposable _refCount;
+            private readonly SortedDictionary<int, IObserver<TRight>> _leftMap = new SortedDictionary<int, IObserver<TRight>>();
+            private readonly SortedDictionary<int, TRight> _rightMap = new SortedDictionary<int, TRight>();
+
+            private readonly Func<TLeft, IObservable<TLeftDuration>> _leftDurationSelector;
+            private readonly Func<TRight, IObservable<TRightDuration>> _rightDurationSelector;
+            private readonly Func<TLeft, IObservable<TRight>, TResult> _resultSelector;
 
             public _(GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
-                _parent = parent;
-            }
+                _refCount = new RefCountDisposable(_group);
+                _leftMap = new SortedDictionary<int, IObserver<TRight>>();
+                _rightMap = new SortedDictionary<int, TRight>();
 
-            private object _gate;
-            private CompositeDisposable _group;
-            private RefCountDisposable _refCount;
+                _leftDurationSelector = parent._leftDurationSelector;
+                _rightDurationSelector = parent._rightDurationSelector;
+                _resultSelector = parent._resultSelector;
+            }
 
             private int _leftID;
-            private SortedDictionary<int, IObserver<TRight>> _leftMap;
-
             private int _rightID;
-            private SortedDictionary<int, TRight> _rightMap;
 
-            public IDisposable Run()
+            public IDisposable Run(GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent)
             {
-                _gate = new object();
-                _group = new CompositeDisposable();
-                _refCount = new RefCountDisposable(_group);
-
                 var leftSubscription = new SingleAssignmentDisposable();
                 _group.Add(leftSubscription);
                 _leftID = 0;
-                _leftMap = new SortedDictionary<int, IObserver<TRight>>();
 
                 var rightSubscription = new SingleAssignmentDisposable();
                 _group.Add(rightSubscription);
                 _rightID = 0;
-                _rightMap = new SortedDictionary<int, TRight>();
 
-                leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
-                rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
+                leftSubscription.Disposable = parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
+                rightSubscription.Disposable = parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
 
                 return _refCount;
             }
 
-            class LeftObserver : IObserver<TLeft>
+            private sealed class LeftObserver : IObserver<TLeft>
             {
                 private readonly _ _parent;
                 private readonly IDisposable _self;
@@ -119,7 +120,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     var duration = default(IObservable<TLeftDuration>);
                     try
                     {
-                        duration = _parent._parent._leftDurationSelector(value);
+                        duration = _parent._leftDurationSelector(value);
                     }
                     catch (Exception exception)
                     {
@@ -128,12 +129,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
 
                     // BREAKING CHANGE v2 > v1.x - The duration sequence is subscribed to before the result sequence is evaluated.
-                    md.Disposable = duration.SubscribeSafe(new Delta(this, id, s, md));
+                    md.Disposable = duration.SubscribeSafe(new DurationObserver(this, id, s, md));
 
                     var result = default(TResult);
                     try
                     {
-                        result = _parent._parent._resultSelector(value, window);
+                        result = _parent._resultSelector(value, window);
                     }
                     catch (Exception exception)
                     {
@@ -155,14 +156,14 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                 }
 
-                class Delta : IObserver<TLeftDuration>
+                private sealed class DurationObserver : IObserver<TLeftDuration>
                 {
                     private readonly LeftObserver _parent;
                     private readonly int _id;
                     private readonly IObserver<TRight> _group;
                     private readonly IDisposable _self;
 
-                    public Delta(LeftObserver parent, int id, IObserver<TRight> group, IDisposable self)
+                    public DurationObserver(LeftObserver parent, int id, IObserver<TRight> group, IDisposable self)
                     {
                         _parent = parent;
                         _id = id;
@@ -212,7 +213,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
             }
 
-            class RightObserver : IObserver<TRight>
+            private sealed class RightObserver : IObserver<TRight>
             {
                 private readonly _ _parent;
                 private readonly IDisposable _self;
@@ -250,7 +251,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     var duration = default(IObservable<TRightDuration>);
                     try
                     {
-                        duration = _parent._parent._rightDurationSelector(value);
+                        duration = _parent._rightDurationSelector(value);
                     }
                     catch (Exception exception)
                     {
@@ -258,7 +259,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         return;
                     }
 
-                    md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
+                    md.Disposable = duration.SubscribeSafe(new DurationObserver(this, id, md));
 
                     lock (_parent._gate)
                     {
@@ -272,13 +273,13 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                 }
 
-                class Delta : IObserver<TRightDuration>
+                private sealed class DurationObserver : IObserver<TRightDuration>
                 {
                     private readonly RightObserver _parent;
                     private readonly int _id;
                     private readonly IDisposable _self;
 
-                    public Delta(RightObserver parent, int id, IDisposable self)
+                    public DurationObserver(RightObserver parent, int id, IDisposable self)
                     {
                         _parent = parent;
                         _id = id;

+ 18 - 21
Rx.NET/Source/src/System.Reactive/Linq/Observable/Join.cs

@@ -28,51 +28,48 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             var sink = new _(this, observer, cancel);
             setSink(sink);
-            return sink.Run();
+            return sink.Run(this);
         }
 
         private sealed class _ : Sink<TResult>
         {
-            // CONSIDER: This sink has a parent reference that can be considered for removal.
+            private readonly object _gate = new object();
+            private readonly CompositeDisposable _group = new CompositeDisposable();
+            private readonly SortedDictionary<int, TLeft> _leftMap = new SortedDictionary<int, TLeft>();
+            private readonly SortedDictionary<int, TRight> _rightMap = new SortedDictionary<int, TRight>();
 
-            private readonly Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> _parent;
+            private readonly Func<TLeft, IObservable<TLeftDuration>> _leftDurationSelector;
+            private readonly Func<TRight, IObservable<TRightDuration>> _rightDurationSelector;
+            private readonly Func<TLeft, TRight, TResult> _resultSelector;
 
             public _(Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
-                _parent = parent;
+                _leftDurationSelector = parent._leftDurationSelector;
+                _rightDurationSelector = parent._rightDurationSelector;
+                _resultSelector = parent._resultSelector;
             }
 
-            private object _gate;
-            private CompositeDisposable _group;
-
             private bool _leftDone;
             private int _leftID;
-            private SortedDictionary<int, TLeft> _leftMap;
 
             private bool _rightDone;
             private int _rightID;
-            private SortedDictionary<int, TRight> _rightMap;
 
-            public IDisposable Run()
+            public IDisposable Run(Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent)
             {
-                _gate = new object();
-                _group = new CompositeDisposable();
-
                 var leftSubscription = new SingleAssignmentDisposable();
                 _group.Add(leftSubscription);
                 _leftDone = false;
                 _leftID = 0;
-                _leftMap = new SortedDictionary<int, TLeft>();
 
                 var rightSubscription = new SingleAssignmentDisposable();
                 _group.Add(rightSubscription);
                 _rightDone = false;
                 _rightID = 0;
-                _rightMap = new SortedDictionary<int, TRight>();
 
-                leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
-                rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
+                leftSubscription.Disposable = parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
+                rightSubscription.Disposable = parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
 
                 return _group;
             }
@@ -119,7 +116,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     var duration = default(IObservable<TLeftDuration>);
                     try
                     {
-                        duration = _parent._parent._leftDurationSelector(value);
+                        duration = _parent._leftDurationSelector(value);
                     }
                     catch (Exception exception)
                     {
@@ -139,7 +136,7 @@ namespace System.Reactive.Linq.ObservableImpl
                                 var result = default(TResult);
                                 try
                                 {
-                                    result = _parent._parent._resultSelector(value, rightValue.Value);
+                                    result = _parent._resultSelector(value, rightValue.Value);
                                 }
                                 catch (Exception exception)
                                 {
@@ -252,7 +249,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     var duration = default(IObservable<TRightDuration>);
                     try
                     {
-                        duration = _parent._parent._rightDurationSelector(value);
+                        duration = _parent._rightDurationSelector(value);
                     }
                     catch (Exception exception)
                     {
@@ -272,7 +269,7 @@ namespace System.Reactive.Linq.ObservableImpl
                                 var result = default(TResult);
                                 try
                                 {
-                                    result = _parent._parent._resultSelector(leftValue.Value, value);
+                                    result = _parent._resultSelector(leftValue.Value, value);
                                 }
                                 catch (Exception exception)
                                 {