Browse Source

4.x: Reduce StableCompositeDisposable.Create overhead via dedicated implementation

akarnokd 7 years ago
parent
commit
0fc86a0c41

+ 2 - 2
Rx.NET/Source/System.Reactive.sln

@@ -342,8 +342,8 @@ Global
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Debug|x64.Build.0 = Rx.net 4.0|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Debug|x86.ActiveCfg = Rx.net 4.0|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Debug|x86.Build.0 = Rx.net 4.0|Any CPU
-		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|Any CPU.ActiveCfg = Rx.net 4.0|Any CPU
-		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|Any CPU.Build.0 = Rx.net 4.0|Any CPU
+		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|Any CPU.ActiveCfg = Current Sources|Any CPU
+		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|Any CPU.Build.0 = Current Sources|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|ARM.ActiveCfg = Rx.net 4.0|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|ARM.Build.0 = Rx.net 4.0|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|x64.ActiveCfg = Rx.net 4.0|Any CPU

+ 1 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -23,6 +23,7 @@ namespace Benchmarks.System.Reactive
                 typeof(ToObservableBenchmark),
                 typeof(RepeatBenchmark),
                 typeof(ComparisonBenchmark),
+                typeof(StableCompositeDisposableBenchmark),
                 typeof(ComparisonAsyncBenchmark)
 #if (CURRENT)
                 ,typeof(AppendPrependBenchmark)

+ 109 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/StableCompositeDisposableBenchmark.cs

@@ -0,0 +1,109 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    [MemoryDiagnoser]
+    public class StableCompositeDisposableBenchmark
+    {
+        [Params(3, 4, 5, 6, 7, 8, 9, 10, 100)]
+        public int N;
+
+        private IDisposable[] _array;
+
+        private List<IDisposable> _list;
+
+        [GlobalSetup]
+        public void Setup()
+        {
+            _array = new IDisposable[N];
+            for (var i = 0; i < N; i++)
+            {
+                _array[i] = Disposable.Empty;
+            }
+            _list = new List<IDisposable>(_array);
+        }
+
+        [Benchmark]
+        public object Create_Array()
+        {
+            return StableCompositeDisposable.Create(_array);
+        }
+
+        [Benchmark]
+        public object Create_Trusted_Array()
+        {
+            return CreateTrusted(_array);
+        }
+
+        [Benchmark]
+        public object Create_List()
+        {
+            return StableCompositeDisposable.Create(_list);
+        }
+
+        [Benchmark]
+        public object Dispose_Array()
+        {
+            var scd = StableCompositeDisposable.Create(_array);
+            scd.Dispose();
+            return scd;
+        }
+
+        [Benchmark]
+        public object Dispose_List()
+        {
+            var scd = StableCompositeDisposable.Create(_list);
+            scd.Dispose();
+            return scd;
+        }
+
+        [Benchmark]
+        public object Dispose_Trused_Array()
+        {
+            var scd = CreateTrusted(_array);
+            scd.Dispose();
+            return scd;
+        }
+
+        // The StableCompositeDisposable.CreateTrusted is inaccessible and
+        // adding the InternalsVisibleTo attribute doesn't work (needs signed assemblies?)
+        internal static ICancelable CreateTrusted(params IDisposable[] disposables)
+        {
+            return new NAryTrustedArray(disposables);
+        }
+
+        private sealed class NAryTrustedArray : StableCompositeDisposable
+        {
+            private IDisposable[] _disposables;
+
+            public NAryTrustedArray(IDisposable[] disposables)
+            {
+                Volatile.Write(ref _disposables, disposables);
+            }
+
+            public override bool IsDisposed => Volatile.Read(ref _disposables) == null;
+
+            public override void Dispose()
+            {
+                var old = Interlocked.Exchange(ref _disposables, null);
+                if (old != null)
+                {
+                    foreach (var d in old)
+                    {
+                        d.Dispose();
+                    }
+                }
+            }
+        }
+    }
+}

+ 77 - 9
Rx.NET/Source/src/System.Reactive/Disposables/StableCompositeDisposable.cs

@@ -45,7 +45,19 @@ namespace System.Reactive.Disposables
                 throw new ArgumentNullException(nameof(disposables));
             }
 
-            return new NAry(disposables);
+            return new NAryArray(disposables);
+        }
+
+        /// <summary>
+        /// Creates a group of disposable resources that are disposed together
+        /// and without copying or checking for nulls inside the group.
+        /// </summary>
+        /// <param name="disposables">The array of disposables that is trusted
+        /// to not contain nulls and gives no need to defensively copy it.</param>
+        /// <returns>Group of disposable resources that are disposed together.</returns>
+        internal static ICancelable CreateTrusted(params IDisposable[] disposables)
+        {
+            return new NAryTrustedArray(disposables);
         }
 
         /// <summary>
@@ -60,7 +72,7 @@ namespace System.Reactive.Disposables
                 throw new ArgumentNullException(nameof(disposables));
             }
 
-            return new NAry(disposables);
+            return new NAryEnumerable(disposables);
         }
 
         /// <summary>
@@ -96,16 +108,11 @@ namespace System.Reactive.Disposables
             }
         }
 
-        private sealed class NAry : StableCompositeDisposable
+        private sealed class NAryEnumerable : StableCompositeDisposable
         {
             private volatile List<IDisposable> _disposables;
 
-            public NAry(IDisposable[] disposables)
-                : this((IEnumerable<IDisposable>)disposables)
-            {
-            }
-
-            public NAry(IEnumerable<IDisposable> disposables)
+            public NAryEnumerable(IEnumerable<IDisposable> disposables)
             {
                 _disposables = new List<IDisposable>(disposables);
 
@@ -132,5 +139,66 @@ namespace System.Reactive.Disposables
                 }
             }
         }
+
+        private sealed class NAryArray : StableCompositeDisposable
+        {
+            private IDisposable[] _disposables;
+
+            public NAryArray(IDisposable[] disposables)
+            {
+                var n = disposables.Length;
+                var ds = new IDisposable[n];
+                // These are likely already vectorized in the framework
+                // At least they are faster than loop-copying
+                Array.Copy(disposables, 0, ds, 0, n);
+                if (Array.IndexOf(ds, null) != -1)
+                {
+                    throw new ArgumentException(Strings_Core.DISPOSABLES_CANT_CONTAIN_NULL, nameof(disposables));
+                }
+                Volatile.Write(ref _disposables, ds);
+            }
+
+            public override bool IsDisposed => Volatile.Read(ref _disposables) == null;
+
+            public override void Dispose()
+            {
+                var old = Interlocked.Exchange(ref _disposables, null);
+                if (old != null)
+                {
+                    foreach (var d in old)
+                    {
+                        d.Dispose();
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// A stable composite that doesn't do defensive copy of
+        /// the input disposable array nor checks it for null.
+        /// </summary>
+        private sealed class NAryTrustedArray : StableCompositeDisposable
+        {
+            private IDisposable[] _disposables;
+
+            public NAryTrustedArray(IDisposable[] disposables)
+            {
+                Volatile.Write(ref _disposables, disposables);
+            }
+
+            public override bool IsDisposed => Volatile.Read(ref _disposables) == null;
+
+            public override void Dispose()
+            {
+                var old = Interlocked.Exchange(ref _disposables, null);
+                if (old != null)
+                {
+                    foreach (var d in old)
+                    {
+                        d.Dispose();
+                    }
+                }
+            }
+        }
     }
 }

+ 14 - 14
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.Generated.cs

@@ -57,7 +57,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer2.SetResource(source2.SubscribeSafe(_observer2));
                 _observer3.SetResource(source3.SubscribeSafe(_observer3));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value);
@@ -114,7 +114,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer3.SetResource(source3.SubscribeSafe(_observer3));
                 _observer4.SetResource(source4.SubscribeSafe(_observer4));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value);
@@ -176,7 +176,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer4.SetResource(source4.SubscribeSafe(_observer4));
                 _observer5.SetResource(source5.SubscribeSafe(_observer5));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value);
@@ -243,7 +243,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer5.SetResource(source5.SubscribeSafe(_observer5));
                 _observer6.SetResource(source6.SubscribeSafe(_observer6));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value);
@@ -315,7 +315,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer6.SetResource(source6.SubscribeSafe(_observer6));
                 _observer7.SetResource(source7.SubscribeSafe(_observer7));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value);
@@ -392,7 +392,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer7.SetResource(source7.SubscribeSafe(_observer7));
                 _observer8.SetResource(source8.SubscribeSafe(_observer8));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value);
@@ -474,7 +474,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer8.SetResource(source8.SubscribeSafe(_observer8));
                 _observer9.SetResource(source9.SubscribeSafe(_observer9));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value);
@@ -561,7 +561,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer9.SetResource(source9.SubscribeSafe(_observer9));
                 _observer10.SetResource(source10.SubscribeSafe(_observer10));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value);
@@ -653,7 +653,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer10.SetResource(source10.SubscribeSafe(_observer10));
                 _observer11.SetResource(source11.SubscribeSafe(_observer11));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value);
@@ -750,7 +750,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer11.SetResource(source11.SubscribeSafe(_observer11));
                 _observer12.SetResource(source12.SubscribeSafe(_observer12));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value);
@@ -852,7 +852,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer12.SetResource(source12.SubscribeSafe(_observer12));
                 _observer13.SetResource(source13.SubscribeSafe(_observer13));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value);
@@ -959,7 +959,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer13.SetResource(source13.SubscribeSafe(_observer13));
                 _observer14.SetResource(source14.SubscribeSafe(_observer14));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value, _observer14.Value);
@@ -1071,7 +1071,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer14.SetResource(source14.SubscribeSafe(_observer14));
                 _observer15.SetResource(source15.SubscribeSafe(_observer15));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value, _observer14.Value, _observer15.Value);
@@ -1188,7 +1188,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer15.SetResource(source15.SubscribeSafe(_observer15));
                 _observer16.SetResource(source16.SubscribeSafe(_observer16));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value, _observer14.Value, _observer15.Value, _observer16.Value);

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.Generated.tt

@@ -96,7 +96,7 @@ for (var j = 1; j <= i; j++)
 }
 #>
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(<#=vs#>);

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs

@@ -438,7 +438,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     d.Disposable = srcs[j].SubscribeSafe(o);
                 }
 
-                SetUpstream(StableCompositeDisposable.Create(_subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(_subscriptions));
             }
 
             private void OnNext(int index, TSource value)

+ 14 - 14
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.Generated.cs

@@ -65,7 +65,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer2.SetResource(source2.SubscribeSafe(_observer2));
                 _observer3.SetResource(source3.SubscribeSafe(_observer3));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue());
@@ -133,7 +133,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer3.SetResource(source3.SubscribeSafe(_observer3));
                 _observer4.SetResource(source4.SubscribeSafe(_observer4));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue());
@@ -209,7 +209,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer4.SetResource(source4.SubscribeSafe(_observer4));
                 _observer5.SetResource(source5.SubscribeSafe(_observer5));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue());
@@ -293,7 +293,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer5.SetResource(source5.SubscribeSafe(_observer5));
                 _observer6.SetResource(source6.SubscribeSafe(_observer6));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue());
@@ -385,7 +385,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer6.SetResource(source6.SubscribeSafe(_observer6));
                 _observer7.SetResource(source7.SubscribeSafe(_observer7));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue());
@@ -485,7 +485,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer7.SetResource(source7.SubscribeSafe(_observer7));
                 _observer8.SetResource(source8.SubscribeSafe(_observer8));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue());
@@ -593,7 +593,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer8.SetResource(source8.SubscribeSafe(_observer8));
                 _observer9.SetResource(source9.SubscribeSafe(_observer9));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue());
@@ -709,7 +709,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer9.SetResource(source9.SubscribeSafe(_observer9));
                 _observer10.SetResource(source10.SubscribeSafe(_observer10));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue());
@@ -833,7 +833,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer10.SetResource(source10.SubscribeSafe(_observer10));
                 _observer11.SetResource(source11.SubscribeSafe(_observer11));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue());
@@ -965,7 +965,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer11.SetResource(source11.SubscribeSafe(_observer11));
                 _observer12.SetResource(source12.SubscribeSafe(_observer12));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue());
@@ -1105,7 +1105,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer12.SetResource(source12.SubscribeSafe(_observer12));
                 _observer13.SetResource(source13.SubscribeSafe(_observer13));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue(), _observer13.Values.Dequeue());
@@ -1253,7 +1253,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer13.SetResource(source13.SubscribeSafe(_observer13));
                 _observer14.SetResource(source14.SubscribeSafe(_observer14));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue(), _observer13.Values.Dequeue(), _observer14.Values.Dequeue());
@@ -1409,7 +1409,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer14.SetResource(source14.SubscribeSafe(_observer14));
                 _observer15.SetResource(source15.SubscribeSafe(_observer15));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue(), _observer13.Values.Dequeue(), _observer14.Values.Dequeue(), _observer15.Values.Dequeue());
@@ -1573,7 +1573,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer15.SetResource(source15.SubscribeSafe(_observer15));
                 _observer16.SetResource(source16.SubscribeSafe(_observer16));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue(), _observer13.Values.Dequeue(), _observer14.Values.Dequeue(), _observer15.Values.Dequeue(), _observer16.Values.Dequeue());

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.Generated.tt

@@ -97,7 +97,7 @@ for (var j = 1; j <= i; j++)
 }
 #>
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(<#=vs#>);

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Platforms/UWP/Linq/AsyncInfoObservable.cs

@@ -95,7 +95,7 @@ namespace System.Reactive.Linq
                     var dataSubscription = data.DefaultIfEmpty().Subscribe(obs);
                     var connection = data.Connect();
 
-                    return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
+                    return StableCompositeDisposable.CreateTrusted(progressSubscription, dataSubscription, connection);
                 }).ToTask(ct);
             });
         }
@@ -218,7 +218,7 @@ namespace System.Reactive.Linq
                     var dataSubscription = resultSelector(data).Subscribe(obs);
                     var connection = data.Connect();
 
-                    return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
+                    return StableCompositeDisposable.CreateTrusted(progressSubscription, dataSubscription, connection);
                 }).ToTask(ct);
             });
         }