ソースを参照

Adding StableCompositeDisposable to optimize memory footprint.

Bart De Smet 10 年 前
コミット
80817cccfe
37 ファイル変更297 行追加74 行削除
  1. 2 2
      Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs
  2. 1 1
      Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Async.cs
  3. 1 1
      Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs
  4. 144 0
      Rx.NET/Source/System.Reactive.Core/Reactive/Disposables/StableCompositeDisposable.cs
  5. 1 0
      Rx.NET/Source/System.Reactive.Core/System.Reactive.Core.csproj
  6. 1 1
      Rx.NET/Source/System.Reactive.Experimental/Reactive/Linq/QueryLanguageEx.cs
  7. 1 1
      Rx.NET/Source/System.Reactive.Experimental/Reactive/ListObservable.cs
  8. 1 1
      Rx.NET/Source/System.Reactive.Interfaces/Reactive/Disposables/ICancelable.cs
  9. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs
  10. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/GroupedObservable.cs
  11. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs
  12. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs
  13. 3 3
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs
  14. 16 16
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs
  15. 3 3
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs
  16. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs
  17. 2 2
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs
  18. 4 4
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
  19. 4 4
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs
  20. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs
  21. 2 2
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs
  22. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs
  23. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs
  24. 2 2
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs
  25. 2 2
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs
  26. 2 2
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs
  27. 3 3
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs
  28. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs
  29. 2 2
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs
  30. 2 2
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs
  31. 2 2
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs
  32. 1 1
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Awaiter.cs
  33. 3 3
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs
  34. 1 1
      Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs
  35. 2 2
      Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs
  36. 2 2
      Rx.NET/Source/System.Reactive.WindowsRuntime/Linq/AsyncInfoObservable.cs
  37. 78 0
      Rx.NET/Source/Tests.System.Reactive/Tests/Disposables/DisposableTests.cs

+ 2 - 2
Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs

@@ -51,7 +51,7 @@ namespace System.Reactive.Concurrency
                     d.Disposable = action(this, state);
                     d.Disposable = action(this, state);
             }, null);
             }, null);
 
 
-            return new CompositeDisposable(
+            return StableCompositeDisposable.Create(
                 d,
                 d,
                 cancel
                 cancel
             );
             );
@@ -83,7 +83,7 @@ namespace System.Reactive.Concurrency
                     d.Disposable = action(this, state);
                     d.Disposable = action(this, state);
             }, null, dt);
             }, null, dt);
 
 
-            return new CompositeDisposable(
+            return StableCompositeDisposable.Create(
                 d,
                 d,
                 cancel
                 cancel
             );
             );

+ 1 - 1
Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Async.cs

@@ -376,7 +376,7 @@ namespace System.Reactive.Concurrency
                 d.Disposable = t.Result;
                 d.Disposable = t.Result;
             }, TaskContinuationOptions.ExecuteSynchronously);
             }, TaskContinuationOptions.ExecuteSynchronously);
 
 
-            return new CompositeDisposable(c, d);
+            return StableCompositeDisposable.Create(c, d);
         }
         }
 
 
         private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task> action)
         private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task> action)

+ 1 - 1
Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs

@@ -76,7 +76,7 @@ namespace System.Reactive.Concurrency
                     _parent._context.OperationCompleted();
                     _parent._context.OperationCompleted();
                 });
                 });
 
 
-                return new CompositeDisposable(d, c);
+                return StableCompositeDisposable.Create(d, c);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)

+ 144 - 0
Rx.NET/Source/System.Reactive.Core/Reactive/Disposables/StableCompositeDisposable.cs

@@ -0,0 +1,144 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Threading;
+
+namespace System.Reactive.Disposables
+{
+    /// <summary>
+    /// Represents a group of disposable resources that are disposed together.
+    /// </summary>
+    public abstract class StableCompositeDisposable : ICancelable
+    {
+        /// <summary>
+        /// Creates a new group containing two disposable resources that are disposed together.
+        /// </summary>
+        /// <param name="disposable1">The first disposable resoruce to add to the group.</param>
+        /// <param name="disposable2">The second disposable resoruce to add to the group.</param>
+        /// <returns>Group of disposable resources that are disposed together.</returns>
+        public static ICancelable Create(IDisposable disposable1, IDisposable disposable2)
+        {
+            if (disposable1 == null)
+                throw new ArgumentNullException("disposable1");
+            if (disposable2 == null)
+                throw new ArgumentNullException("disposable2");
+
+            return new Binary(disposable1, disposable2);
+        }
+
+        /// <summary>
+        /// Creates a new group of disposable resources that are disposed together.
+        /// </summary>
+        /// <param name="disposables">Disposable resources to add to the group.</param>
+        /// <returns>Group of disposable resources that are disposed together.</returns>
+        public static ICancelable Create(params IDisposable[] disposables)
+        {
+            if (disposables == null)
+                throw new ArgumentNullException("disposables");
+
+            return new NAry(disposables);
+        }
+
+        /// <summary>
+        /// Creates a new group of disposable resources that are disposed together.
+        /// </summary>
+        /// <param name="disposables">Disposable resources to add to the group.</param>
+        /// <returns>Group of disposable resources that are disposed together.</returns>
+        public static ICancelable Create(IEnumerable<IDisposable> disposables)
+        {
+            if (disposables == null)
+                throw new ArgumentNullException("disposables");
+
+            return new NAry(disposables);
+        }
+
+        /// <summary>
+        /// Disposes all disposables in the group.
+        /// </summary>
+        public abstract void Dispose();
+
+        /// <summary>
+        /// Gets a value that indicates whether the object is disposed.
+        /// </summary>
+        public abstract bool IsDisposed
+        {
+            get;
+        }
+
+        class Binary : StableCompositeDisposable
+        {
+            private volatile IDisposable _disposable1;
+            private volatile IDisposable _disposable2;
+
+            public Binary(IDisposable disposable1, IDisposable disposable2)
+            {
+                _disposable1 = disposable1;
+                _disposable2 = disposable2;
+            }
+
+            public override bool IsDisposed
+            {
+                get
+                {
+                    return _disposable1 == null;
+                }
+            }
+
+            public override void Dispose()
+            {
+                var old1 = Interlocked.Exchange(ref _disposable1, null);
+                if (old1 != null)
+                {
+                    old1.Dispose();
+                }
+
+                var old2 = Interlocked.Exchange(ref _disposable2, null);
+                if (old2 != null)
+                {
+                    old2.Dispose();
+                }
+            }
+        }
+
+        class NAry : StableCompositeDisposable
+        {
+            private volatile List<IDisposable> _disposables;
+
+            public NAry(IDisposable[] disposables)
+                : this((IEnumerable<IDisposable>)disposables)
+            {
+            }
+
+            public NAry(IEnumerable<IDisposable> disposables)
+            {
+                _disposables = new List<IDisposable>(disposables);
+
+                //
+                // Doing this on the list to avoid duplicate enumeration of disposables.
+                //
+                if (_disposables.Contains(null))
+                    throw new ArgumentException(Strings_Core.DISPOSABLES_CANT_CONTAIN_NULL, "disposables");
+            }
+
+            public override bool IsDisposed
+            {
+                get
+                {
+                    return _disposables == null;
+                }
+            }
+
+            public override void Dispose()
+            {
+                var old = Interlocked.Exchange(ref _disposables, null);
+                if (old != null)
+                {
+                    foreach (var d in old)
+                    {
+                        d.Dispose();
+                    }
+                }
+            }
+        }
+    }
+}

+ 1 - 0
Rx.NET/Source/System.Reactive.Core/System.Reactive.Core.csproj

@@ -69,6 +69,7 @@
     <Compile Include="Reactive\Concurrency\Synchronization.Synchronize.cs" />
     <Compile Include="Reactive\Concurrency\Synchronization.Synchronize.cs" />
     <Compile Include="Reactive\Concurrency\SynchronizationContextScheduler.cs" />
     <Compile Include="Reactive\Concurrency\SynchronizationContextScheduler.cs" />
     <Compile Include="Reactive\Concurrency\DefaultScheduler.cs" />
     <Compile Include="Reactive\Concurrency\DefaultScheduler.cs" />
+    <Compile Include="Reactive\Disposables\StableCompositeDisposable.cs" />
     <Compile Include="Reactive\Internal\AsyncLockObserver.cs" />
     <Compile Include="Reactive\Internal\AsyncLockObserver.cs" />
     <Compile Include="Reactive\Internal\CheckedObserver.cs" />
     <Compile Include="Reactive\Internal\CheckedObserver.cs" />
     <Compile Include="Reactive\Internal\ConcurrentDictionary.cs" />
     <Compile Include="Reactive\Internal\ConcurrentDictionary.cs" />

+ 1 - 1
Rx.NET/Source/System.Reactive.Experimental/Reactive/Linq/QueryLanguageEx.cs

@@ -436,7 +436,7 @@ namespace System.Reactive.Linq
                 leftSubscription.Disposable = leftSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner);
                 leftSubscription.Disposable = leftSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner);
                 rightSubscription.Disposable = rightSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x)).Synchronize(gate).Subscribe(combiner);
                 rightSubscription.Disposable = rightSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x)).Synchronize(gate).Subscribe(combiner);
 
 
-                return new CompositeDisposable(leftSubscription, rightSubscription);
+                return StableCompositeDisposable.Create(leftSubscription, rightSubscription);
             });
             });
         }
         }
 
 

+ 1 - 1
Rx.NET/Source/System.Reactive.Experimental/Reactive/ListObservable.cs

@@ -199,7 +199,7 @@ namespace System.Reactive
             if (observer == null)
             if (observer == null)
                 throw new ArgumentNullException("observer");
                 throw new ArgumentNullException("observer");
 
 
-            return new CompositeDisposable(subscription, subject.Subscribe(observer));
+            return StableCompositeDisposable.Create(subscription, subject.Subscribe(observer));
         }
         }
     }
     }
 }
 }

+ 1 - 1
Rx.NET/Source/System.Reactive.Interfaces/Reactive/Disposables/ICancelable.cs

@@ -3,7 +3,7 @@
 namespace System.Reactive.Disposables
 namespace System.Reactive.Disposables
 {
 {
     /// <summary>
     /// <summary>
-    /// Disposable resource with dipsosal state tracking.
+    /// Disposable resource with disposal state tracking.
     /// </summary>
     /// </summary>
     public interface ICancelable : IDisposable
     public interface ICancelable : IDisposable
     {
     {

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs

@@ -42,7 +42,7 @@ namespace System.Reactive
                 _gate.Wait(MoveNext);
                 _gate.Wait(MoveNext);
             });
             });
 
 
-            return new CompositeDisposable(_subscription, cancelable, Disposable.Create(() => _gate.Wait(Dispose)));
+            return StableCompositeDisposable.Create(_subscription, cancelable, Disposable.Create(() => _gate.Wait(Dispose)));
         }
         }
 
 
         protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source);
         protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source);

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/GroupedObservable.cs

@@ -38,7 +38,7 @@ namespace System.Reactive.Linq
                 //
                 //
                 var release = _refCount.GetDisposable();
                 var release = _refCount.GetDisposable();
                 var subscription = _subject.Subscribe/*Unsafe*/(observer);
                 var subscription = _subject.Subscribe/*Unsafe*/(observer);
-                return new CompositeDisposable(release, subscription);
+                return StableCompositeDisposable.Create(release, subscription);
             }
             }
             else
             else
             {
             {

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs

@@ -19,7 +19,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
         {
-            var d = new CompositeDisposable(_refCount.GetDisposable(), cancel);
+            var d = StableCompositeDisposable.Create(_refCount.GetDisposable(), cancel);
 
 
             var sink = new _(observer, d);
             var sink = new _(observer, d);
             setSink(sink);
             setSink(sink);

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs

@@ -40,7 +40,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             {
                 var ls = new SingleAssignmentDisposable();
                 var ls = new SingleAssignmentDisposable();
                 var rs = new SingleAssignmentDisposable();
                 var rs = new SingleAssignmentDisposable();
-                var d = new CompositeDisposable(ls, rs);
+                var d = StableCompositeDisposable.Create(ls, rs);
 
 
                 var gate = new object();
                 var gate = new object();
 
 

+ 3 - 3
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs

@@ -178,7 +178,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var subscription = _parent._source.SubscribeSafe(this);
                 var subscription = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable { _timerD, subscription };
+                return StableCompositeDisposable.Create(_timerD, subscription);
             }
             }
 
 
             private void CreateWindow()
             private void CreateWindow()
@@ -304,7 +304,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 var d = _parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick);
                 var d = _parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick);
                 var s = _parent._source.SubscribeSafe(this);
                 var s = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(d, s);
+                return StableCompositeDisposable.Create(d, s);
             }
             }
 
 
             private void Tick()
             private void Tick()
@@ -377,7 +377,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var subscription = _parent._source.SubscribeSafe(this);
                 var subscription = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable { _timerD, subscription };
+                return StableCompositeDisposable.Create(_timerD, subscription);
             }
             }
 
 
             private void CreateTimer(int id)
             private void CreateTimer(int id)

+ 16 - 16
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs

@@ -59,7 +59,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
                 fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
                 sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
                 sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
 
 
-                return new CompositeDisposable(fstSubscription, sndSubscription);
+                return StableCompositeDisposable.Create(fstSubscription, sndSubscription);
             }
             }
 
 
             class F : IObserver<TFirst>
             class F : IObserver<TFirst>
@@ -280,7 +280,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
                 subscriptions[1].Disposable = _parent._source2.SubscribeSafe(_observer2);
                 subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
                 subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -345,7 +345,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
                 subscriptions[2].Disposable = _parent._source3.SubscribeSafe(_observer3);
                 subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
                 subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -417,7 +417,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
                 subscriptions[3].Disposable = _parent._source4.SubscribeSafe(_observer4);
                 subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
                 subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -492,7 +492,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
                 subscriptions[4].Disposable = _parent._source5.SubscribeSafe(_observer5);
                 subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
                 subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -572,7 +572,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
                 subscriptions[5].Disposable = _parent._source6.SubscribeSafe(_observer6);
                 subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
                 subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -657,7 +657,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
                 subscriptions[6].Disposable = _parent._source7.SubscribeSafe(_observer7);
                 subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
                 subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -747,7 +747,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
                 subscriptions[7].Disposable = _parent._source8.SubscribeSafe(_observer8);
                 subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
                 subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -842,7 +842,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
                 subscriptions[8].Disposable = _parent._source9.SubscribeSafe(_observer9);
                 subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
                 subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -942,7 +942,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
                 subscriptions[9].Disposable = _parent._source10.SubscribeSafe(_observer10);
                 subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
                 subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -1047,7 +1047,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
                 subscriptions[10].Disposable = _parent._source11.SubscribeSafe(_observer11);
                 subscriptions[11].Disposable = _parent._source12.SubscribeSafe(_observer12);
                 subscriptions[11].Disposable = _parent._source12.SubscribeSafe(_observer12);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -1157,7 +1157,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[11].Disposable = _parent._source12.SubscribeSafe(_observer12);
                 subscriptions[11].Disposable = _parent._source12.SubscribeSafe(_observer12);
                 subscriptions[12].Disposable = _parent._source13.SubscribeSafe(_observer13);
                 subscriptions[12].Disposable = _parent._source13.SubscribeSafe(_observer13);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -1272,7 +1272,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[12].Disposable = _parent._source13.SubscribeSafe(_observer13);
                 subscriptions[12].Disposable = _parent._source13.SubscribeSafe(_observer13);
                 subscriptions[13].Disposable = _parent._source14.SubscribeSafe(_observer14);
                 subscriptions[13].Disposable = _parent._source14.SubscribeSafe(_observer14);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -1392,7 +1392,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[13].Disposable = _parent._source14.SubscribeSafe(_observer14);
                 subscriptions[13].Disposable = _parent._source14.SubscribeSafe(_observer14);
                 subscriptions[14].Disposable = _parent._source15.SubscribeSafe(_observer15);
                 subscriptions[14].Disposable = _parent._source15.SubscribeSafe(_observer15);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -1517,7 +1517,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 subscriptions[14].Disposable = _parent._source15.SubscribeSafe(_observer15);
                 subscriptions[14].Disposable = _parent._source15.SubscribeSafe(_observer15);
                 subscriptions[15].Disposable = _parent._source16.SubscribeSafe(_observer16);
                 subscriptions[15].Disposable = _parent._source16.SubscribeSafe(_observer16);
 
 
-                return new CompositeDisposable(subscriptions);
+                return StableCompositeDisposable.Create(subscriptions);
             }
             }
 
 
             protected override TResult GetResult()
             protected override TResult GetResult()
@@ -1765,7 +1765,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     d.Disposable = srcs[j].SubscribeSafe(o);
                     d.Disposable = srcs[j].SubscribeSafe(o);
                 }
                 }
 
 
-                return new CompositeDisposable(_subscriptions);
+                return StableCompositeDisposable.Create(_subscriptions);
             }
             }
 
 
             private void OnNext(int index, TSource value)
             private void OnNext(int index, TSource value)

+ 3 - 3
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs

@@ -111,7 +111,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _sourceSubscription = sourceSubscription;
                 _sourceSubscription = sourceSubscription;
                 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
                 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(_sourceSubscription, _cancelable);
+                return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
             }
             }
 
 
             private void Start()
             private void Start()
@@ -386,7 +386,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _sourceSubscription = sourceSubscription;
                 _sourceSubscription = sourceSubscription;
                 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
                 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(_sourceSubscription, _cancelable);
+                return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
             }
             }
 
 
             private void Start()
             private void Start()
@@ -632,7 +632,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelay(this));
                     _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelay(this));
                 }
                 }
 
 
-                return new CompositeDisposable(_subscription, _delays);
+                return StableCompositeDisposable.Create(_subscription, _delays);
             }
             }
 
 
             private void Start()
             private void Start()

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs

@@ -57,7 +57,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 var subscription = observable.SubscribeSafe(this);
                 var subscription = observable.SubscribeSafe(this);
                 var connection = connectable.Connect();
                 var connection = connectable.Connect();
 
 
-                return new CompositeDisposable(subscription, connection);
+                return StableCompositeDisposable.Create(subscription, connection);
             }
             }
 
 
             public void OnNext(TResult value)
             public void OnNext(TResult value)

+ 2 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs

@@ -53,7 +53,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var samplerSubscription = _parent._sampler.SubscribeSafe(new SampleImpl(this));
                 var samplerSubscription = _parent._sampler.SubscribeSafe(new SampleImpl(this));
 
 
-                return new CompositeDisposable(_sourceSubscription, samplerSubscription);
+                return StableCompositeDisposable.Create(_sourceSubscription, samplerSubscription);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)
@@ -187,7 +187,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _sourceSubscription = sourceSubscription;
                 _sourceSubscription = sourceSubscription;
                 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
                 sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(
+                return StableCompositeDisposable.Create(
                     sourceSubscription,
                     sourceSubscription,
                     _parent._scheduler.SchedulePeriodic(_parent._interval, Tick)
                     _parent._scheduler.SchedulePeriodic(_parent._interval, Tick)
                 );
                 );

+ 4 - 4
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs

@@ -606,7 +606,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _cancel = new CancellationDisposable();
                 _cancel = new CancellationDisposable();
                 _count = 1;
                 _count = 1;
 
 
-                return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+                return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)
@@ -741,7 +741,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _cancel = new CancellationDisposable();
                 _cancel = new CancellationDisposable();
                 _count = 1;
                 _count = 1;
 
 
-                return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+                return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)
@@ -1532,7 +1532,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _cancel = new CancellationDisposable();
                 _cancel = new CancellationDisposable();
                 _count = 1;
                 _count = 1;
 
 
-                return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+                return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)
@@ -1643,7 +1643,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _cancel = new CancellationDisposable();
                 _cancel = new CancellationDisposable();
                 _count = 1;
                 _count = 1;
 
 
-                return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+                return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)

+ 4 - 4
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs

@@ -68,11 +68,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 _ql = new Queue<TSource>();
                 _ql = new Queue<TSource>();
                 _qr = new Queue<TSource>();
                 _qr = new Queue<TSource>();
 
 
-                return new CompositeDisposable
-                {
+                return StableCompositeDisposable.Create
+                (
                     _parent._first.SubscribeSafe(new F(this)),
                     _parent._first.SubscribeSafe(new F(this)),
                     _parent._second.SubscribeSafe(new S(this))
                     _parent._second.SubscribeSafe(new S(this))
-                };
+                );
             }
             }
 
 
             class F : IObserver<TSource>
             class F : IObserver<TSource>
@@ -258,7 +258,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     return Disposable.Empty;
                     return Disposable.Empty;
                 }
                 }
 
 
-                return new CompositeDisposable(
+                return StableCompositeDisposable.Create(
                     _parent._first.SubscribeSafe(this),
                     _parent._first.SubscribeSafe(this),
                     _enumerator
                     _enumerator
                 );
                 );

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs

@@ -120,7 +120,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             {
                 var t = _parent._scheduler.Schedule(_parent._duration, Tick);
                 var t = _parent._scheduler.Schedule(_parent._duration, Tick);
                 var d = _parent._source.SubscribeSafe(this);
                 var d = _parent._source.SubscribeSafe(this);
-                return new CompositeDisposable(t, d);
+                return StableCompositeDisposable.Create(t, d);
             }
             }
 
 
             private void Tick()
             private void Tick()

+ 2 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs

@@ -47,7 +47,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 sourceObserver.Disposable = sourceSubscription;
                 sourceObserver.Disposable = sourceSubscription;
                 otherObserver.Disposable = otherSubscription;
                 otherObserver.Disposable = otherSubscription;
 
 
-                return new CompositeDisposable(
+                return StableCompositeDisposable.Create(
                     sourceSubscription,
                     sourceSubscription,
                     otherSubscription
                     otherSubscription
                 );
                 );
@@ -179,7 +179,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             {
                 var t = _parent._scheduler.Schedule(_parent._startTime, Tick);
                 var t = _parent._scheduler.Schedule(_parent._startTime, Tick);
                 var d = _parent._source.SubscribeSafe(this);
                 var d = _parent._source.SubscribeSafe(this);
-                return new CompositeDisposable(t, d);
+                return StableCompositeDisposable.Create(t, d);
             }
             }
 
 
             private void Tick()
             private void Tick()

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs

@@ -51,7 +51,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _subscription = subscription;
                 _subscription = subscription;
                 subscription.Disposable = _parent._sources.SubscribeSafe(this);
                 subscription.Disposable = _parent._sources.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(_subscription, _innerSubscription);
+                return StableCompositeDisposable.Create(_subscription, _innerSubscription);
             }
             }
 
 
             public void OnNext(IObservable<TSource> value)
             public void OnNext(IObservable<TSource> value)

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs

@@ -133,7 +133,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var t = _parent._scheduler.Schedule(_parent._duration, Tick);
                 var t = _parent._scheduler.Schedule(_parent._duration, Tick);
                 var d = _parent._source.SubscribeSafe(this);
                 var d = _parent._source.SubscribeSafe(this);
-                return new CompositeDisposable(t, d);
+                return StableCompositeDisposable.Create(t, d);
             }
             }
 
 
             private void Tick()
             private void Tick()

+ 2 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs

@@ -69,7 +69,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 _subscription.Disposable = _parent._source.SubscribeSafe(this);
                 _subscription.Disposable = _parent._source.SubscribeSafe(this);
                 
                 
-                return new CompositeDisposable(_subscription, _loop);
+                return StableCompositeDisposable.Create(_subscription, _loop);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)
@@ -155,7 +155,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _watch = _parent._scheduler.StartStopwatch();
                 _watch = _parent._scheduler.StartStopwatch();
                 _subscription.Disposable = _parent._source.SubscribeSafe(this);
                 _subscription.Disposable = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(_subscription, _loop);
+                return StableCompositeDisposable.Create(_subscription, _loop);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)

+ 2 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs

@@ -47,7 +47,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var sourceSubscription = _parent._source.SubscribeSafe(sourceObserver);
                 var sourceSubscription = _parent._source.SubscribeSafe(sourceObserver);
 
 
-                return new CompositeDisposable(
+                return StableCompositeDisposable.Create(
                     otherSubscription,
                     otherSubscription,
                     sourceSubscription
                     sourceSubscription
                 );
                 );
@@ -215,7 +215,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var t = _parent._scheduler.Schedule(_parent._endTime, Tick);
                 var t = _parent._scheduler.Schedule(_parent._endTime, Tick);
                 var d = _parent._source.SubscribeSafe(this);
                 var d = _parent._source.SubscribeSafe(this);
-                return new CompositeDisposable(t, d);
+                return StableCompositeDisposable.Create(t, d);
             }
             }
 
 
             private void Tick()
             private void Tick()

+ 2 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs

@@ -53,7 +53,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var subscription = _parent._source.SubscribeSafe(this);
                 var subscription = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(subscription, _cancelable);
+                return StableCompositeDisposable.Create(subscription, _cancelable);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)
@@ -160,7 +160,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var subscription = _parent._source.SubscribeSafe(this);
                 var subscription = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(subscription, _cancelable);
+                return StableCompositeDisposable.Create(subscription, _cancelable);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)

+ 3 - 3
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs

@@ -75,7 +75,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 original.Disposable = _parent._source.SubscribeSafe(this);
                 original.Disposable = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(_subscription, timer);
+                return StableCompositeDisposable.Create(_subscription, timer);
             }
             }
 
 
             private void Timeout()
             private void Timeout()
@@ -169,7 +169,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 original.Disposable = _parent._source.SubscribeSafe(this);
                 original.Disposable = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(_subscription, _timer);
+                return StableCompositeDisposable.Create(_subscription, _timer);
             }
             }
 
 
             private void CreateTimer()
             private void CreateTimer()
@@ -309,7 +309,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 original.Disposable = _parent._source.SubscribeSafe(this);
                 original.Disposable = _parent._source.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(_subscription, _timer);
+                return StableCompositeDisposable.Create(_subscription, _timer);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs

@@ -210,7 +210,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     var c = new SingleAssignmentDisposable();
                     var c = new SingleAssignmentDisposable();
                     c.Disposable = self.Schedule(1L, CatchUp);
                     c.Disposable = self.Schedule(1L, CatchUp);
 
 
-                    return new CompositeDisposable(2) { d, c };
+                    return StableCompositeDisposable.Create(d, c);
                 }
                 }
 
 
                 return d;
                 return d;

+ 2 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs

@@ -48,10 +48,10 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
                 }
                 catch (Exception exception)
                 catch (Exception exception)
                 {
                 {
-                    return new CompositeDisposable(Observable.Throw<TSource>(exception).SubscribeSafe(this), disposable);
+                    return StableCompositeDisposable.Create(Observable.Throw<TSource>(exception).SubscribeSafe(this), disposable);
                 }
                 }
 
 
-                return new CompositeDisposable(source.SubscribeSafe(this), disposable);
+                return StableCompositeDisposable.Create(source.SubscribeSafe(this), disposable);
             }
             }
 
 
             public void OnNext(TSource value)
             public void OnNext(TSource value)

+ 2 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs

@@ -76,7 +76,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
                 fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
                 sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
                 sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
 
 
-                return new CompositeDisposable(fstSubscription, sndSubscription, fstO, sndO);
+                return StableCompositeDisposable.Create(fstSubscription, sndSubscription, fstO, sndO);
             }
             }
 
 
             class F : IObserver<TFirst>, IDisposable
             class F : IObserver<TFirst>, IDisposable
@@ -292,7 +292,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var leftSubscription = _parent._first.SubscribeSafe(this);
                 var leftSubscription = _parent._first.SubscribeSafe(this);
 
 
-                return new CompositeDisposable(leftSubscription, _rightEnumerator);
+                return StableCompositeDisposable.Create(leftSubscription, _rightEnumerator);
             }
             }
 
 
             public void OnNext(TFirst value)
             public void OnNext(TFirst value)

+ 2 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs

@@ -701,7 +701,7 @@ namespace System.Reactive.Linq
                 // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
                 // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
                 //
                 //
                 var subscription = result.Subscribe/*Unsafe*/(observer);
                 var subscription = result.Subscribe/*Unsafe*/(observer);
-                return new CompositeDisposable(cancellable, subscription);
+                return StableCompositeDisposable.Create(cancellable, subscription);
             });
             });
         }
         }
 #endif
 #endif
@@ -758,7 +758,7 @@ namespace System.Reactive.Linq
                 // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
                 // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
                 //
                 //
                 var subscription = result.Subscribe/*Unsafe*/(observer);
                 var subscription = result.Subscribe/*Unsafe*/(observer);
-                return new CompositeDisposable(cancellable, subscription);
+                return StableCompositeDisposable.Create(cancellable, subscription);
             });
             });
         }
         }
 #endif
 #endif

+ 1 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Awaiter.cs

@@ -53,7 +53,7 @@ namespace System.Reactive.Linq
 
 
             if (cancellationToken.CanBeCanceled)
             if (cancellationToken.CanBeCanceled)
             {
             {
-                RegisterCancelation(s, new CompositeDisposable(d, c), cancellationToken);
+                RegisterCancelation(s, StableCompositeDisposable.Create(d, c), cancellationToken);
             }
             }
 
 
             return s;
             return s;

+ 3 - 3
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs

@@ -50,7 +50,7 @@ namespace System.Reactive.Linq
                 var taskCompletionObserver = new AnonymousObserver<Unit>(Stubs<Unit>.Ignore, observer.OnError, observer.OnCompleted);
                 var taskCompletionObserver = new AnonymousObserver<Unit>(Stubs<Unit>.Ignore, observer.OnError, observer.OnCompleted);
                 var subscription = taskObservable.Subscribe(taskCompletionObserver);
                 var subscription = taskObservable.Subscribe(taskCompletionObserver);
 
 
-                return new CompositeDisposable(cancellable, subscription);
+                return StableCompositeDisposable.Create(cancellable, subscription);
             });
             });
         }
         }
 
 
@@ -75,7 +75,7 @@ namespace System.Reactive.Linq
                 //
                 //
                 taskObservable.Subscribe(taskCompletionObserver);
                 taskObservable.Subscribe(taskCompletionObserver);
 
 
-                return new CompositeDisposable(cancellable, subscription);
+                return StableCompositeDisposable.Create(cancellable, subscription);
             });
             });
         }
         }
 
 
@@ -100,7 +100,7 @@ namespace System.Reactive.Linq
                 //
                 //
                 taskObservable.Subscribe(taskCompletionObserver);
                 taskObservable.Subscribe(taskCompletionObserver);
 
 
-                return new CompositeDisposable(cancellable, subscription);
+                return StableCompositeDisposable.Create(cancellable, subscription);
             });
             });
         }
         }
 
 

+ 1 - 1
Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs

@@ -226,7 +226,7 @@ namespace System.Reactive.Concurrency
 
 
             d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick);
             d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick);
 
 
-            return new CompositeDisposable(d, gate);
+            return StableCompositeDisposable.Create(d, gate);
         }
         }
 
 
 #if !NO_STOPWATCH
 #if !NO_STOPWATCH

+ 2 - 2
Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs

@@ -214,7 +214,7 @@ namespace System.Reactive.Concurrency
 
 
             moveNext();
             moveNext();
 
 
-            return new CompositeDisposable(cancel, gate);
+            return StableCompositeDisposable.Create(cancel, gate);
 #else
 #else
             var state1 = state;
             var state1 = state;
             var gate = new AsyncLock();
             var gate = new AsyncLock();
@@ -230,7 +230,7 @@ namespace System.Reactive.Concurrency
                 });
                 });
             }, period);
             }, period);
 
 
-            return new CompositeDisposable(timer, gate);
+            return ImmutableCompositeDisposable.Create(timer, gate);
 #endif
 #endif
         }
         }
     }
     }

+ 2 - 2
Rx.NET/Source/System.Reactive.WindowsRuntime/Linq/AsyncInfoObservable.cs

@@ -85,7 +85,7 @@ namespace System.Reactive.Linq
                     var dataSubscription = data.DefaultIfEmpty().Subscribe(obs);
                     var dataSubscription = data.DefaultIfEmpty().Subscribe(obs);
                     var connection = data.Connect();
                     var connection = data.Connect();
 
 
-                    return new CompositeDisposable(progressSubscription, dataSubscription, connection);
+                    return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
                 }).ToTask(ct);
                 }).ToTask(ct);
             });
             });
         }
         }
@@ -191,7 +191,7 @@ namespace System.Reactive.Linq
                     var dataSubscription = resultSelector(data).Subscribe(obs);
                     var dataSubscription = resultSelector(data).Subscribe(obs);
                     var connection = data.Connect();
                     var connection = data.Connect();
 
 
-                    return new CompositeDisposable(progressSubscription, dataSubscription, connection);
+                    return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
                 }).ToTask(ct);
                 }).ToTask(ct);
             });
             });
         }
         }

+ 78 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Disposables/DisposableTests.cs

@@ -636,5 +636,83 @@ namespace ReactiveTests.Tests
             m.Disposable.Dispose();        // This should be a nop.
             m.Disposable.Dispose();        // This should be a nop.
             Assert.IsTrue(m.IsDisposed);
             Assert.IsTrue(m.IsDisposed);
         }
         }
+
+        [TestMethod]
+        public void StableCompositeDisposable_ArgumentChecking()
+        {
+            var d = Disposable.Empty;
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => StableCompositeDisposable.Create(null, d));
+            ReactiveAssert.Throws<ArgumentNullException>(() => StableCompositeDisposable.Create(d, null));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => StableCompositeDisposable.Create(default(IDisposable[])));
+            ReactiveAssert.Throws<ArgumentNullException>(() => StableCompositeDisposable.Create(default(IEnumerable<IDisposable>)));
+
+            ReactiveAssert.Throws<ArgumentException>(() => StableCompositeDisposable.Create(null, d, d));
+            ReactiveAssert.Throws<ArgumentException>(() => StableCompositeDisposable.Create(d, null, d));
+            ReactiveAssert.Throws<ArgumentException>(() => StableCompositeDisposable.Create(d, d, null));
+        }
+
+        [TestMethod]
+        public void StableCompositeDisposable_Binary()
+        {
+            var disp1 = false;
+            var d1 = Disposable.Create(() => { Assert.IsFalse(disp1); disp1 = true; });
+
+            var disp2 = false;
+            var d2 = Disposable.Create(() => { Assert.IsFalse(disp2); disp2 = true; });
+
+            var d = StableCompositeDisposable.Create(d1, d2);
+
+            Assert.IsFalse(disp1);
+            Assert.IsFalse(disp2);
+            Assert.IsFalse(d.IsDisposed);
+
+            d.Dispose();
+
+            Assert.IsTrue(disp1);
+            Assert.IsTrue(disp2);
+            Assert.IsTrue(d.IsDisposed);
+
+            d.Dispose();
+
+            Assert.IsTrue(disp1);
+            Assert.IsTrue(disp2);
+            Assert.IsTrue(d.IsDisposed);
+        }
+
+        [TestMethod]
+        public void StableCompositeDisposable_Nary()
+        {
+            var disp1 = false;
+            var d1 = Disposable.Create(() => { Assert.IsFalse(disp1); disp1 = true; });
+
+            var disp2 = false;
+            var d2 = Disposable.Create(() => { Assert.IsFalse(disp2); disp2 = true; });
+
+            var disp3 = false;
+            var d3 = Disposable.Create(() => { Assert.IsFalse(disp3); disp3 = true; });
+
+            var d = StableCompositeDisposable.Create(d1, d2, d3);
+
+            Assert.IsFalse(disp1);
+            Assert.IsFalse(disp2);
+            Assert.IsFalse(disp3);
+            Assert.IsFalse(d.IsDisposed);
+
+            d.Dispose();
+
+            Assert.IsTrue(disp1);
+            Assert.IsTrue(disp2);
+            Assert.IsTrue(disp3);
+            Assert.IsTrue(d.IsDisposed);
+
+            d.Dispose();
+
+            Assert.IsTrue(disp1);
+            Assert.IsTrue(disp2);
+            Assert.IsTrue(disp3);
+            Assert.IsTrue(d.IsDisposed);
+        }
     }
     }
 }
 }