Procházet zdrojové kódy

Optimizing GroupBy[Until] operator layouts.

Bart De Smet před 8 roky
rodič
revize
f70ed17d38

+ 4 - 8
Rx.NET/Source/src/System.Reactive/Linq/GroupedObservable.cs

@@ -7,29 +7,25 @@ using System.Reactive.Subjects;
 
 namespace System.Reactive.Linq
 {
-    class GroupedObservable<TKey, TElement> : ObservableBase<TElement>, IGroupedObservable<TKey, TElement>
+    internal sealed class GroupedObservable<TKey, TElement> : ObservableBase<TElement>, IGroupedObservable<TKey, TElement>
     {
-        private readonly TKey _key;
         private readonly IObservable<TElement> _subject;
         private readonly RefCountDisposable _refCount;
 
         public GroupedObservable(TKey key, ISubject<TElement> subject, RefCountDisposable refCount)
         {
-            _key = key;
+            Key = key;
             _subject = subject;
             _refCount = refCount;
         }
 
         public GroupedObservable(TKey key, ISubject<TElement> subject)
         {
-            _key = key;
+            Key = key;
             _subject = subject;
         }
 
-        public TKey Key
-        {
-            get { return _key; }
-        }
+        public TKey Key { get; }
 
         protected override IDisposable SubscribeCore(IObserver<TElement> observer)
         {

+ 27 - 25
Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupBy.cs

@@ -25,48 +25,52 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
-        private CompositeDisposable _groupDisposable;
-        private RefCountDisposable _refCountDisposable;
-
         protected override IDisposable Run(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
-            _groupDisposable = new CompositeDisposable();
-            _refCountDisposable = new RefCountDisposable(_groupDisposable);
-
             var sink = new _(this, observer, cancel);
             setSink(sink);
-            _groupDisposable.Add(_source.SubscribeSafe(sink));
-
-            return _refCountDisposable;
+            return sink.Run(_source);
         }
 
-        class _ : Sink<IGroupedObservable<TKey, TElement>>, IObserver<TSource>
+        private sealed class _ : Sink<IGroupedObservable<TKey, TElement>>, IObserver<TSource>
         {
-            private readonly GroupBy<TSource, TKey, TElement> _parent;
-            private readonly Dictionary<TKey, ISubject<TElement>> _map;
-            private ISubject<TElement> _null;
+            private readonly Func<TSource, TKey> _keySelector;
+            private readonly Func<TSource, TElement> _elementSelector;
+            private readonly Dictionary<TKey, Subject<TElement>> _map;
+
+            private RefCountDisposable _refCountDisposable;
+            private Subject<TElement> _null;
 
             public _(GroupBy<TSource, TKey, TElement> parent, IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
-                _parent = parent;
+                _keySelector = parent._keySelector;
+                _elementSelector = parent._elementSelector;
 
-                if (_parent._capacity.HasValue)
+                if (parent._capacity.HasValue)
                 {
-                    _map = new Dictionary<TKey, ISubject<TElement>>(_parent._capacity.Value, _parent._comparer);
+                    _map = new Dictionary<TKey, Subject<TElement>>(parent._capacity.Value, parent._comparer);
                 }
                 else
                 {
-                    _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer);
+                    _map = new Dictionary<TKey, Subject<TElement>>(parent._comparer);
                 }
             }
 
+            public IDisposable Run(IObservable<TSource> source)
+            {
+                var sourceSubscription = new SingleAssignmentDisposable();
+                _refCountDisposable = new RefCountDisposable(sourceSubscription);
+                sourceSubscription.Disposable = source.SubscribeSafe(this);
+                return _refCountDisposable;
+            }
+
             public void OnNext(TSource value)
             {
                 var key = default(TKey);
                 try
                 {
-                    key = _parent._keySelector(value);
+                    key = _keySelector(value);
                 }
                 catch (Exception exception)
                 {
@@ -75,7 +79,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
 
                 var fireNewMapEntry = false;
-                var writer = default(ISubject<TElement>);
+                var writer = default(Subject<TElement>);
                 try
                 {
                     //
@@ -124,14 +128,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 if (fireNewMapEntry)
                 {
-                    var group = new GroupedObservable<TKey, TElement>(key, writer, _parent._refCountDisposable);
+                    var group = new GroupedObservable<TKey, TElement>(key, writer, _refCountDisposable);
                     _observer.OnNext(group);
                 }
 
                 var element = default(TElement);
                 try
                 {
-                    element = _parent._elementSelector(value);
+                    element = _elementSelector(value);
                 }
                 catch (Exception exception)
                 {
@@ -149,8 +153,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void OnCompleted()
             {
-                if (_null != null)
-                    _null.OnCompleted();
+                _null?.OnCompleted();
 
                 foreach (var w in _map.Values)
                     w.OnCompleted();
@@ -161,8 +164,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             private void Error(Exception exception)
             {
-                if (_null != null)
-                    _null.OnError(exception);
+                _null?.OnError(exception);
 
                 foreach (var w in _map.Values)
                     w.OnError(exception);

+ 40 - 42
Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs

@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
 using System.Reactive.Disposables;
@@ -28,34 +29,42 @@ namespace System.Reactive.Linq.ObservableImpl
             _comparer = comparer;
         }
 
-        private CompositeDisposable _groupDisposable;
-        private RefCountDisposable _refCountDisposable;
-
         protected override IDisposable Run(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
-            _groupDisposable = new CompositeDisposable();
-            _refCountDisposable = new RefCountDisposable(_groupDisposable);
-
             var sink = new _(this, observer, cancel);
             setSink(sink);
-            _groupDisposable.Add(_source.SubscribeSafe(sink));
-
-            return _refCountDisposable;
+            return sink.Run(_source);
         }
 
-        class _ : Sink<IGroupedObservable<TKey, TElement>>, IObserver<TSource>
+        private sealed class _ : Sink<IGroupedObservable<TKey, TElement>>, IObserver<TSource>
         {
-            private readonly GroupByUntil<TSource, TKey, TElement, TDuration> _parent;
+            private readonly object _nullGate = new object();
+            private readonly CompositeDisposable _groupDisposable = new CompositeDisposable();
+            private readonly RefCountDisposable _refCountDisposable;
             private readonly Map<TKey, ISubject<TElement>> _map;
+
+            private readonly Func<TSource, TKey> _keySelector;
+            private readonly Func<TSource, TElement> _elementSelector;
+            private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector;
+
             private ISubject<TElement> _null;
-            private object _nullGate;
 
             public _(GroupByUntil<TSource, TKey, TElement, TDuration> parent, IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
-                _parent = parent;
-                _map = new Map<TKey, ISubject<TElement>>(_parent._capacity, _parent._comparer);
-                _nullGate = new object();
+                _refCountDisposable = new RefCountDisposable(_groupDisposable);
+                _map = new Map<TKey, ISubject<TElement>>(parent._capacity, parent._comparer);
+
+                _keySelector = parent._keySelector;
+                _elementSelector = parent._elementSelector;
+                _durationSelector = parent._durationSelector;
+            }
+
+            public IDisposable Run(IObservable<TSource> source)
+            {
+                _groupDisposable.Add(source.SubscribeSafe(this));
+
+                return _refCountDisposable;
             }
 
             private ISubject<TElement> NewSubject()
@@ -70,7 +79,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 var key = default(TKey);
                 try
                 {
-                    key = _parent._keySelector(value);
+                    key = _keySelector(value);
                 }
                 catch (Exception exception)
                 {
@@ -116,14 +125,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 if (fireNewMapEntry)
                 {
-                    var group = new GroupedObservable<TKey, TElement>(key, writer, _parent._refCountDisposable);
+                    var group = new GroupedObservable<TKey, TElement>(key, writer, _refCountDisposable);
 
                     var duration = default(IObservable<TDuration>);
 
                     var durationGroup = new GroupedObservable<TKey, TElement>(key, writer);
                     try
                     {
-                        duration = _parent._durationSelector(durationGroup);
+                        duration = _durationSelector(durationGroup);
                     }
                     catch (Exception exception)
                     {
@@ -135,14 +144,14 @@ namespace System.Reactive.Linq.ObservableImpl
                         base._observer.OnNext(group);
 
                     var md = new SingleAssignmentDisposable();
-                    _parent._groupDisposable.Add(md);
-                    md.Disposable = duration.SubscribeSafe(new Delta(this, key, writer, md));
+                    _groupDisposable.Add(md);
+                    md.Disposable = duration.SubscribeSafe(new DurationObserver(this, key, writer, md));
                 }
 
                 var element = default(TElement);
                 try
                 {
-                    element = _parent._elementSelector(value);
+                    element = _elementSelector(value);
                 }
                 catch (Exception exception)
                 {
@@ -171,14 +180,14 @@ namespace System.Reactive.Linq.ObservableImpl
                 writer.OnNext(element);
             }
 
-            class Delta : IObserver<TDuration>
+            private sealed class DurationObserver : IObserver<TDuration>
             {
                 private readonly _ _parent;
                 private readonly TKey _key;
                 private readonly ISubject<TElement> _writer;
                 private readonly IDisposable _self;
 
-                public Delta(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
+                public DurationObserver(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
                 {
                     _parent = parent;
                     _key = key;
@@ -218,7 +227,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                     }
 
-                    _parent._parent._groupDisposable.Remove(_self);
+                    _parent._groupDisposable.Remove(_self);
                 }
             }
 
@@ -238,8 +247,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 lock (_nullGate)
                     @null = _null;
 
-                if (@null != null)
-                    @null.OnCompleted();
+                @null?.OnCompleted();
 
                 foreach (var w in _map.Values)
                     w.OnCompleted();
@@ -261,8 +269,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 lock (_nullGate)
                     @null = _null;
 
-                if (@null != null)
-                    @null.OnError(exception);
+                @null?.OnError(exception);
 
                 foreach (var w in _map.Values)
                     w.OnError(exception);
@@ -286,22 +293,19 @@ namespace System.Reactive.Linq.ObservableImpl
         // compromise.
         private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4;
 
-        private static int DefaultConcurrencyLevel
-        {
-            get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; }
-        }
+        private static int DefaultConcurrencyLevel => DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount;
 
-        private readonly System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue> _map;
+        private readonly ConcurrentDictionary<TKey, TValue> _map;
 
         public Map(int? capacity, IEqualityComparer<TKey> comparer)
         {
             if (capacity.HasValue)
             {
-                _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer);
+                _map = new ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer);
             }
             else
             {
-                _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+                _map = new ConcurrentDictionary<TKey, TValue>(comparer);
             }
         }
 
@@ -334,13 +338,7 @@ namespace System.Reactive.Linq.ObservableImpl
             return value;
         }
 
-        public IEnumerable<TValue> Values
-        {
-            get
-            {
-                return _map.Values.ToArray();
-            }
-        }
+        public IEnumerable<TValue> Values => _map.Values.ToArray();
 
         public bool Remove(TKey key)
         {