Browse Source

Feature/2005 large composite disposable perf (#2092)

Ian Griffiths 1 year ago
parent
commit
8da262d87b

+ 71 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/GroupByCompletion.cs

@@ -0,0 +1,71 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Reactive.Linq;
+
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    /// <summary>
+    /// Completion of a wide fan-out/in scenario.
+    /// </summary>
+    /// <remarks>
+    /// <para>
+    /// This was added to address https://github.com/dotnet/reactive/issues/2005 in which completion
+    /// takes longer and longer to handle as the number of groups increases.
+    /// </para>
+    /// <para>
+    /// The queries in this benchmark represent the common 'fan out/in' pattern in Rx. It is often
+    /// useful to split a stream into groups to enable per-group processing, and then to recombine
+    /// the data back into a single stream. These benchmarks don't do any per-group processing, so
+    /// they might look pointless, but we're trying to measure the minimum unavoidable overhead
+    /// that any code using this technique will encounter.
+    /// </para>
+    /// </remarks>
+    [MemoryDiagnoser]
+    public class GroupByCompletion
+    {
+        private IObservable<int> observable;
+
+        [Params(200_000, 1_000_000)]
+        public int NumberOfSamples { get; set; }
+
+        [Params(10, 100, 1_000, 10_000, 100_000, 150_000, 200_000)]
+        public int NumberOfGroups { get; set; }
+
+        [GlobalSetup]
+        public void GlobalSetup()
+        {
+            var data = new int[NumberOfSamples];
+            for (var i = 0; i < data.Length; ++i)
+            {
+                data[i] = i;
+            }
+
+            observable = data.ToObservable();
+        }
+
+        [Benchmark]
+        public void GroupBySelectMany()
+        {
+            var numberOfGroups = NumberOfGroups;
+
+            observable!.GroupBy(value => value % numberOfGroups)
+                .SelectMany(groupOfInts => groupOfInts)
+                .Subscribe(intValue => { });
+        }
+
+        [Benchmark]
+        public void GroupByMerge()
+        {
+            var numberOfGroups = NumberOfGroups;
+
+            observable!.GroupBy(value => value % numberOfGroups)
+                .Merge()
+                .Subscribe(intValue => { });
+        }
+    }
+}

+ 2 - 1
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -27,7 +27,8 @@ namespace Benchmarks.System.Reactive
                 typeof(ScalarScheduleBenchmark),
                 typeof(ScalarScheduleBenchmark),
                 typeof(StableCompositeDisposableBenchmark),
                 typeof(StableCompositeDisposableBenchmark),
                 typeof(SubjectBenchmark),
                 typeof(SubjectBenchmark),
-                typeof(ComparisonAsyncBenchmark)
+                typeof(ComparisonAsyncBenchmark),
+                typeof(GroupByCompletion)
 #if (CURRENT)
 #if (CURRENT)
                 ,typeof(AppendPrependBenchmark)
                 ,typeof(AppendPrependBenchmark)
                 ,typeof(PrependVsStartWtihBenchmark)
                 ,typeof(PrependVsStartWtihBenchmark)

+ 164 - 35
Rx.NET/Source/src/System.Reactive/Disposables/CompositeDisposable.cs

@@ -4,6 +4,7 @@
 
 
 using System.Collections;
 using System.Collections;
 using System.Collections.Generic;
 using System.Collections.Generic;
+using System.Linq;
 using System.Threading;
 using System.Threading;
 
 
 namespace System.Reactive.Disposables
 namespace System.Reactive.Disposables
@@ -16,10 +17,17 @@ namespace System.Reactive.Disposables
     {
     {
         private readonly object _gate = new();
         private readonly object _gate = new();
         private bool _disposed;
         private bool _disposed;
-        private List<IDisposable?> _disposables;
+        private object _disposables;
         private int _count;
         private int _count;
         private const int ShrinkThreshold = 64;
         private const int ShrinkThreshold = 64;
 
 
+        // The maximum number of items to keep in a list before switching to a dictionary.
+        // Issue https://github.com/dotnet/reactive/issues/2005 reported that when a SelectMany
+        // observes large numbers (1000s) of observables, the CompositeDisposable it uses to
+        // keep track of all of the inner observables it creates becomes a bottleneck when the
+        // subscription completes.
+        private const int MaximumLinearSearchThreshold = 1024;
+
         // Default initial capacity of the _disposables list in case
         // Default initial capacity of the _disposables list in case
         // The number of items is not known upfront
         // The number of items is not known upfront
         private const int DefaultCapacity = 16;
         private const int DefaultCapacity = 16;
@@ -29,7 +37,7 @@ namespace System.Reactive.Disposables
         /// </summary>
         /// </summary>
         public CompositeDisposable()
         public CompositeDisposable()
         {
         {
-            _disposables = [];
+            _disposables = new List<IDisposable?>();
         }
         }
 
 
         /// <summary>
         /// <summary>
@@ -60,11 +68,11 @@ namespace System.Reactive.Disposables
                 throw new ArgumentNullException(nameof(disposables));
                 throw new ArgumentNullException(nameof(disposables));
             }
             }
 
 
-            _disposables = ToList(disposables);
+            (_disposables, _) = ToListOrDictionary(disposables);
 
 
             // _count can be read by other threads and thus should be properly visible
             // _count can be read by other threads and thus should be properly visible
             // also releases the _disposables contents so it becomes thread-safe
             // also releases the _disposables contents so it becomes thread-safe
-            Volatile.Write(ref _count, _disposables.Count);
+            Volatile.Write(ref _count, disposables.Length);
         }
         }
 
 
         /// <summary>
         /// <summary>
@@ -80,14 +88,14 @@ namespace System.Reactive.Disposables
                 throw new ArgumentNullException(nameof(disposables));
                 throw new ArgumentNullException(nameof(disposables));
             }
             }
 
 
-            _disposables = ToList(disposables);
+            (_disposables, var count) = ToListOrDictionary(disposables);
 
 
             // _count can be read by other threads and thus should be properly visible
             // _count can be read by other threads and thus should be properly visible
             // also releases the _disposables contents so it becomes thread-safe
             // also releases the _disposables contents so it becomes thread-safe
-            Volatile.Write(ref _count, _disposables.Count);
+            Volatile.Write(ref _count, count);
         }
         }
 
 
-        private static List<IDisposable?> ToList(IEnumerable<IDisposable> disposables)
+        private static (object Collection, int Count) ToListOrDictionary(IEnumerable<IDisposable> disposables)
         {
         {
             var capacity = disposables switch
             var capacity = disposables switch
             {
             {
@@ -96,6 +104,26 @@ namespace System.Reactive.Disposables
                 _ => DefaultCapacity
                 _ => DefaultCapacity
             };
             };
 
 
+            if (capacity > MaximumLinearSearchThreshold)
+            {
+                var dictionary = new Dictionary<IDisposable, int>(capacity);
+                var disposableCount = 0;
+                foreach (var d in disposables)
+                {
+                    if (d == null)
+                    {
+                        throw new ArgumentException(Strings_Core.DISPOSABLES_CANT_CONTAIN_NULL, nameof(disposables));
+                    }
+
+                    dictionary.TryGetValue(d, out var thisDisposableCount);
+                    dictionary[d] = thisDisposableCount + 1;
+
+                    disposableCount += 1;
+                }
+
+                return (dictionary, disposableCount);
+            }
+
             var list = new List<IDisposable?>(capacity);
             var list = new List<IDisposable?>(capacity);
 
 
             // do the copy and null-check in one step to avoid a
             // do the copy and null-check in one step to avoid a
@@ -110,7 +138,14 @@ namespace System.Reactive.Disposables
                 list.Add(d);
                 list.Add(d);
             }
             }
 
 
-            return list;
+            if (list.Count > MaximumLinearSearchThreshold)
+            {
+                // We end up here if we didn't know the count up front because it's an
+                // IEnumerable<IDisposable> and not an ICollection<IDisposable>, and it then turns out that
+                // the number of items exceeds our maximum tolerance for linear search.
+            }
+
+            return (list, list.Count);
         }
         }
 
 
         /// <summary>
         /// <summary>
@@ -134,7 +169,37 @@ namespace System.Reactive.Disposables
             {
             {
                 if (!_disposed)
                 if (!_disposed)
                 {
                 {
-                    _disposables.Add(item);
+                    if (_disposables is List<IDisposable?> listDisposables)
+                    {
+                        listDisposables.Add(item);
+
+                        // Once we get to thousands of items (which happens with wide fan-out/in configurations)
+                        // the cost of linear search becomes too high. We switch to a dictionary at that point.
+                        // See https://github.com/dotnet/reactive/issues/2005
+                        if (listDisposables.Count > MaximumLinearSearchThreshold)
+                        {
+                            // If we've blown through this threshold, chances are there's more to come,
+                            // so allocate some more spare capacity.
+                            var dictionary = new Dictionary<IDisposable, int>(listDisposables.Count + (listDisposables.Count / 4));
+                            foreach (var d in listDisposables)
+                            {
+                                if (d is not null)
+                                {
+                                    dictionary.TryGetValue(d, out var thisDisposableCount);
+                                    dictionary[d] = thisDisposableCount + 1;
+                                }
+                            }
+
+                            _disposables = dictionary;
+                        }
+
+                    }
+                    else
+                    {
+                        var dictionaryDisposables = (Dictionary<IDisposable, int>)_disposables;
+                        dictionaryDisposables.TryGetValue(item, out var thisDisposableCount);
+                        dictionaryDisposables[item] = thisDisposableCount + 1;
+                    }
 
 
                     // If read atomically outside the lock, it should be written atomically inside
                     // If read atomically outside the lock, it should be written atomically inside
                     // the plain read on _count is fine here because manipulation always happens
                     // the plain read on _count is fine here because manipulation always happens
@@ -180,28 +245,49 @@ namespace System.Reactive.Disposables
                 // read fields as infrequently as possible
                 // read fields as infrequently as possible
                 var current = _disposables;
                 var current = _disposables;
 
 
-                var i = current.IndexOf(item);
-                if (i < 0)
+                if (current is List<IDisposable?> currentList)
                 {
                 {
-                    // not found, just return
-                    return false;
-                }
-
-                current[i] = null;
+                    var i = currentList.IndexOf(item);
+                    if (i < 0)
+                    {
+                        // not found, just return
+                        return false;
+                    }
 
 
-                if (current.Capacity > ShrinkThreshold && _count < current.Capacity / 2)
-                {
-                    var fresh = new List<IDisposable?>(current.Capacity / 2);
+                    currentList[i] = null;
 
 
-                    foreach (var d in current)
+                    if (currentList.Capacity > ShrinkThreshold && _count < currentList.Capacity / 2)
                     {
                     {
-                        if (d != null)
+                        var fresh = new List<IDisposable?>(currentList.Capacity / 2);
+
+                        foreach (var d in currentList)
                         {
                         {
-                            fresh.Add(d);
+                            if (d != null)
+                            {
+                                fresh.Add(d);
+                            }
                         }
                         }
+
+                        _disposables = fresh;
+                    } 
+                }
+                else
+                {
+                    var dictionaryDisposables = (Dictionary<IDisposable, int>)_disposables;
+                    if (!dictionaryDisposables.TryGetValue(item, out var thisDisposableCount))
+                    {
+                        return false;
                     }
                     }
 
 
-                    _disposables = fresh;
+                    thisDisposableCount -= 1;
+                    if (thisDisposableCount == 0)
+                    {
+                        dictionaryDisposables.Remove(item);
+                    }
+                    else
+                    {
+                        dictionaryDisposables[item] = thisDisposableCount;
+                    }
                 }
                 }
 
 
                 // make sure the Count property sees an atomic update
                 // make sure the Count property sees an atomic update
@@ -221,13 +307,15 @@ namespace System.Reactive.Disposables
         /// </summary>
         /// </summary>
         public void Dispose()
         public void Dispose()
         {
         {
-            List<IDisposable?>? currentDisposables = null;
+            List<IDisposable?>? currentDisposablesList = null;
+            Dictionary<IDisposable, int>? currentDisposablesDictionary = null;
 
 
             lock (_gate)
             lock (_gate)
             {
             {
                 if (!_disposed)
                 if (!_disposed)
                 {
                 {
-                    currentDisposables = _disposables;
+                    currentDisposablesList = _disposables as List<IDisposable?>;
+                    currentDisposablesDictionary = _disposables as Dictionary<IDisposable, int>;
 
 
                     // nulling out the reference is faster no risk to
                     // nulling out the reference is faster no risk to
                     // future Add/Remove because _disposed will be true
                     // future Add/Remove because _disposed will be true
@@ -239,13 +327,24 @@ namespace System.Reactive.Disposables
                 }
                 }
             }
             }
 
 
-            if (currentDisposables != null)
+            if (currentDisposablesList is not null)
             {
             {
-                foreach (var d in currentDisposables)
+                foreach (var d in currentDisposablesList)
                 {
                 {
+                    // Although we don't all nulls in from the outside, we implement Remove
+                    // by setting entries to null, and shrinking the list if it gets too sparse.
+                    // So some entries may be null.
                     d?.Dispose();
                     d?.Dispose();
                 }
                 }
             }
             }
+
+            if (currentDisposablesDictionary is not null)
+            {
+                foreach (var kv in currentDisposablesDictionary)
+                {
+                    kv.Key.Dispose();
+                }
+            }
         }
         }
 
 
         /// <summary>
         /// <summary>
@@ -265,8 +364,18 @@ namespace System.Reactive.Disposables
 
 
                 var current = _disposables;
                 var current = _disposables;
 
 
-                previousDisposables = current.ToArray();
-                current.Clear();
+                if (current is List<IDisposable?> currentList)
+                {
+                    previousDisposables = currentList.ToArray();
+                    currentList.Clear();
+                }
+                else
+                {
+                    var currentDictionary = (Dictionary<IDisposable, int>)current;
+                    previousDisposables = new IDisposable[currentDictionary.Count];
+                    currentDictionary.Keys.CopyTo(previousDisposables!, 0);
+                    currentDictionary.Clear();
+                }
 
 
                 Volatile.Write(ref _count, 0);
                 Volatile.Write(ref _count, 0);
             }
             }
@@ -297,7 +406,10 @@ namespace System.Reactive.Disposables
                     return false;
                     return false;
                 }
                 }
 
 
-                return _disposables.Contains(item);
+                var current = _disposables;
+                return current is List<IDisposable?> list
+                    ? list.Contains(item)
+                    : ((Dictionary<IDisposable, int>) current).ContainsKey(item);
             }
             }
         }
         }
 
 
@@ -336,12 +448,27 @@ namespace System.Reactive.Disposables
                 }
                 }
                 
                 
                 var i = arrayIndex;
                 var i = arrayIndex;
-                
-                foreach (var d in _disposables)
+
+                var current = _disposables;
+
+                if (current is List<IDisposable?> currentList)
+                {
+                    foreach (var d in currentList)
+                    {
+                        if (d != null)
+                        {
+                            array[i++] = d;
+                        }
+                    }
+                }
+                else
                 {
                 {
-                    if (d != null)
+                    foreach (var kv in (Dictionary<IDisposable, int>)current)
                     {
                     {
-                        array[i++] = d;
+                        for (var j = 0; j < kv.Value; j++)
+                        {
+                            array[i++] = kv.Key;
+                        }
                     }
                     }
                 }
                 }
             }
             }
@@ -365,9 +492,11 @@ namespace System.Reactive.Disposables
                     return EmptyEnumerator;
                     return EmptyEnumerator;
                 }
                 }
 
 
+                var current = _disposables;
+
                 // the copy is unavoidable but the creation
                 // the copy is unavoidable but the creation
                 // of an outer IEnumerable is avoidable
                 // of an outer IEnumerable is avoidable
-                return new CompositeEnumerator(_disposables.ToArray());
+                return new CompositeEnumerator(current is List<IDisposable?> currentList ? currentList.ToArray() : ((Dictionary<IDisposable, int>)current).Keys.ToArray());
             }
             }
         }
         }