Browse Source

4.x: Inline StableCompositeDisposable.Create into the Sinks (#578)

David Karnok 7 years ago
parent
commit
d76c0b36f8
27 changed files with 483 additions and 204 deletions
  1. 25 4
      Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs
  2. 11 6
      Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.Emulation.cs
  3. 10 4
      Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs
  4. 0 5
      Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs
  5. 60 19
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs
  6. 1 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs
  7. 20 16
      Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs
  8. 10 2
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs
  9. 11 9
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Finally.cs
  10. 1 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupBy.cs
  11. 1 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs
  12. 13 4
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Multicast.cs
  13. 29 21
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Sample.cs
  14. 48 12
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs
  15. 38 10
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SequenceEqual.cs
  16. 13 3
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs
  17. 13 3
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs
  18. 14 9
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Switch.cs
  19. 13 3
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs
  20. 29 20
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs
  21. 13 3
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs
  22. 27 15
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs
  23. 28 13
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs
  24. 14 4
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Using.cs
  25. 3 3
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs
  26. 15 10
      Rx.NET/Source/src/System.Reactive/Linq/Observable/WithLatestFrom.cs
  27. 23 3
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs

+ 25 - 4
Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs

@@ -196,15 +196,17 @@ namespace System.Reactive.Concurrency
 
             var state1 = state;
 
-            var d = new MultipleAssignmentDisposable();
+            var td = new TernaryDisposable();
+
             var gate = new AsyncLock();
+            td.Extra = gate;
 
             var tick = default(Func<IScheduler, object, IDisposable>);
             tick = (self_, _) =>
             {
                 next += period;
 
-                d.Disposable = self_.Schedule(null, next - _stopwatch.Elapsed, tick);
+                td.Next = self_.Schedule(null, next - _stopwatch.Elapsed, tick);
 
                 gate.Wait(() =>
                 {
@@ -214,9 +216,28 @@ namespace System.Reactive.Concurrency
                 return Disposable.Empty;
             };
 
-            d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick);
+            td.First = Schedule(null, next - _stopwatch.Elapsed, tick);
+
+            return td;
+        }
+
+        private sealed class TernaryDisposable : IDisposable
+        {
+            private IDisposable _task;
+            private IDisposable _extra;
+
+            // If Next was called before this assignment is executed, it won't overwrite
+            // a more fresh IDisposable task
+            public IDisposable First { set { Disposable.TrySetSingle(ref _task, value); } }
+            // It is fine to overwrite the first or previous IDisposable task
+            public IDisposable Next { set { Disposable.TrySetMultiple(ref _task, value); } }
+            public IDisposable Extra { set { Disposable.SetSingle(ref _extra, value); } }
 
-            return StableCompositeDisposable.Create(d, gate);
+            public void Dispose()
+            {
+                Disposable.TryDispose(ref _task);
+                Disposable.TryDispose(ref _extra);
+            }
         }
 
         /// <summary>

+ 11 - 6
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.Emulation.cs

@@ -289,7 +289,7 @@ namespace System.Reactive.Concurrency
             }
         }
 
-        private sealed class SchedulePeriodicStopwatch<TState>
+        private sealed class SchedulePeriodicStopwatch<TState> : IDisposable
         {
             private readonly IScheduler _scheduler;
             private readonly TimeSpan _period;
@@ -341,6 +341,8 @@ namespace System.Reactive.Concurrency
             private const int SUSPENDED = 2;
             private const int DISPOSED = 3;
 
+            private IDisposable _task;
+
             public IDisposable Start()
             {
                 RegisterHostLifecycleEventHandlers();
@@ -349,11 +351,14 @@ namespace System.Reactive.Concurrency
                 _nextDue = _period;
                 _runState = RUNNING;
 
-                return StableCompositeDisposable.Create
-                (
-                    _scheduler.Schedule(_nextDue, Tick),
-                    Disposable.Create(Cancel)
-                );
+                Disposable.TrySetSingle(ref _task, _scheduler.Schedule(_nextDue, Tick));
+                return this;
+            }
+
+            void IDisposable.Dispose()
+            {
+                Disposable.TryDispose(ref _task);
+                Cancel();
             }
 
             private void Tick(Action<TimeSpan> recurse)

+ 10 - 4
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs

@@ -72,7 +72,7 @@ namespace System.Reactive.Concurrency
                     _context = context;
                 }
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     //
                     // The interactions with OperationStarted/OperationCompleted below allow
@@ -83,10 +83,16 @@ namespace System.Reactive.Concurrency
                     //
                     _context.OperationStarted();
 
-                    var d = source.SubscribeSafe(this);
-                    var c = Disposable.Create(_context.OperationCompleted);
+                    SetUpstream(source.SubscribeSafe(this));
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(d, c));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        _context.OperationCompleted();
+                    }
+                    base.Dispose(disposing);
                 }
 
                 public override void OnNext(TSource value)

+ 0 - 5
Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

@@ -363,11 +363,6 @@ namespace System.Reactive
 
         readonly ConcurrentQueue<T> queue;
 
-        /// <summary>
-        /// The disposable of the upstream source.
-        /// </summary>
-        IDisposable upstream;
-
         private IDisposable _run;
 
         /// <summary>

+ 60 - 19
Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs

@@ -124,7 +124,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private readonly object _gate = new object();
                 private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>();
-                private readonly SerialDisposable _timerD = new SerialDisposable();
+                
+                private IDisposable _timerSerial;
 
                 public _(TimeSliding parent, IObserver<IList<TSource>> observer)
                     : base(observer)
@@ -146,9 +147,16 @@ namespace System.Reactive.Linq.ObservableImpl
                     CreateWindow();
                     CreateTimer();
 
-                    var subscription = parent._source.SubscribeSafe(this);
+                    base.Run(parent._source);
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(_timerD, subscription));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _timerSerial);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private void CreateWindow()
@@ -160,7 +168,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 private void CreateTimer()
                 {
                     var m = new SingleAssignmentDisposable();
-                    _timerD.Disposable = m;
+
+                    Disposable.TrySetSerial(ref _timerSerial, m);
 
                     var isSpan = false;
                     var isShift = false;
@@ -281,14 +290,23 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private List<TSource> _list;
 
+                private IDisposable _periodicDisposable;
+
                 public void Run(TimeHopping parent)
                 {
                     _list = new List<TSource>();
 
-                    var d = parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick);
-                    var s = parent._source.SubscribeSafe(this);
+                    Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick));
+                    base.Run(parent._source);
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(d, s));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _periodicDisposable);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private void Tick()
@@ -353,7 +371,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly Ferry _parent;
 
                 private readonly object _gate = new object();
-                private readonly SerialDisposable _timerD = new SerialDisposable();
+                private IDisposable _timerSerial;
 
                 public _(Ferry parent, IObserver<IList<TSource>> observer)
                     : base(observer)
@@ -373,15 +391,22 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     CreateTimer(0);
 
-                    var subscription = _parent._source.SubscribeSafe(this);
+                    SetUpstream(_parent._source.SubscribeSafe(this));
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(_timerD, subscription));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _timerSerial);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private void CreateTimer(int id)
                 {
                     var m = new SingleAssignmentDisposable();
-                    _timerD.Disposable = m;
+                    Disposable.TrySetSerial(ref _timerSerial, m);
 
                     m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
                 }
@@ -477,7 +502,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly object _gate = new object();
                 private readonly AsyncLock _bufferGate = new AsyncLock();
-                private readonly SerialDisposable _bufferClosingSubscription = new SerialDisposable();
+                private IDisposable _bufferClosingSerialDisposable;
 
                 private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;
 
@@ -489,15 +514,22 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private IList<TSource> _buffer;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _buffer = new List<TSource>();
 
-                    var groupDisposable = StableCompositeDisposable.Create(_bufferClosingSubscription, source.SubscribeSafe(this));
+                    base.Run(source);
 
                     _bufferGate.Wait(CreateBufferClose);
+                }
 
-                    SetUpstream(groupDisposable);
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _bufferClosingSerialDisposable);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private void CreateBufferClose()
@@ -517,7 +549,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
 
                     var closingSubscription = new SingleAssignmentDisposable();
-                    _bufferClosingSubscription.Disposable = closingSubscription;
+                    Disposable.TrySetSerial(ref _bufferClosingSerialDisposable, closingSubscription);
                     closingSubscription.Disposable = bufferClose.SubscribeSafe(new BufferClosingObserver(this, closingSubscription));
                 }
 
@@ -616,14 +648,23 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private IList<TSource> _buffer;
 
+                private IDisposable _boundariesDisposable;
+
                 public void Run(Boundaries parent)
                 {
                     _buffer = new List<TSource>();
 
-                    var sourceSubscription = parent._source.SubscribeSafe(this);
-                    var boundariesSubscription = parent._bufferBoundaries.SubscribeSafe(new BufferClosingObserver(this));
+                    base.Run(parent._source);
+                    Disposable.SetSingle(ref _boundariesDisposable, parent._bufferBoundaries.SubscribeSafe(new BufferClosingObserver(this)));
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(sourceSubscription, boundariesSubscription));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _boundariesDisposable);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private sealed class BufferClosingObserver : IObserver<TBufferClosing>

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs

@@ -91,7 +91,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             private SerialDisposable _subscription;
 
-            public void Run(IObservable<TSource> source)
+            public override void Run(IObservable<TSource> source)
             {
                 _subscription = new SerialDisposable();
 

+ 20 - 16
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs

@@ -40,35 +40,41 @@ namespace System.Reactive.Linq.ObservableImpl
 
             private object _gate;
 
+            private IDisposable _firstDisposable;
+            private IDisposable _secondDisposable;
+
             public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
             {
                 _gate = new object();
 
-                var fstSubscription = new SingleAssignmentDisposable();
-                var sndSubscription = new SingleAssignmentDisposable();
-
-                var fstO = new FirstObserver(this, fstSubscription);
-                var sndO = new SecondObserver(this, sndSubscription);
+                var fstO = new FirstObserver(this);
+                var sndO = new SecondObserver(this);
 
                 fstO.Other = sndO;
                 sndO.Other = fstO;
 
-                fstSubscription.Disposable = first.SubscribeSafe(fstO);
-                sndSubscription.Disposable = second.SubscribeSafe(sndO);
+                Disposable.SetSingle(ref _firstDisposable, first.SubscribeSafe(fstO));
+                Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(sndO));
+            }
 
-                SetUpstream(StableCompositeDisposable.Create(fstSubscription, sndSubscription));
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _firstDisposable);
+                    Disposable.TryDispose(ref _secondDisposable);
+                }
+                base.Dispose(disposing);
             }
 
             private sealed class FirstObserver : IObserver<TFirst>
             {
                 private readonly _ _parent;
-                private readonly IDisposable _self;
                 private SecondObserver _other;
 
-                public FirstObserver(_ parent, IDisposable self)
+                public FirstObserver(_ parent)
                 {
                     _parent = parent;
-                    _self = self;
                 }
 
                 public SecondObserver Other { set { _other = value; } }
@@ -128,7 +134,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                         else
                         {
-                            _self.Dispose();
+                            Disposable.TryDispose(ref _parent._firstDisposable);
                         }
                     }
                 }
@@ -137,13 +143,11 @@ namespace System.Reactive.Linq.ObservableImpl
             private sealed class SecondObserver : IObserver<TSecond>
             {
                 private readonly _ _parent;
-                private readonly IDisposable _self;
                 private FirstObserver _other;
 
-                public SecondObserver(_ parent, IDisposable self)
+                public SecondObserver(_ parent)
                 {
                     _parent = parent;
-                    _self = self;
                 }
 
                 public FirstObserver Other { set { _other = value; } }
@@ -203,7 +207,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                         else
                         {
-                            _self.Dispose();
+                            Disposable.TryDispose(ref _parent._secondDisposable);
                         }
                     }
                 }

+ 10 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -642,9 +642,17 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _atEnd = false;
 
-                    _subscription = RunCore(parent);
+                    Disposable.SetSingle(ref _subscription, RunCore(parent));
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(_subscription, _delays));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _subscription);
+                        _delays.Dispose();
+                    }
+                    base.Dispose(disposing);
                 }
 
                 protected abstract IDisposable RunCore(TParent parent);

+ 11 - 9
Rx.NET/Source/src/System.Reactive/Linq/Observable/Finally.cs

@@ -25,27 +25,29 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             private readonly Action _finallyAction;
 
+            private IDisposable _sourceDisposable;
+
             public _(Action finallyAction, IObserver<TSource> observer)
                 : base(observer)
             {
                 _finallyAction = finallyAction;
             }
 
-            public void Run(IObservable<TSource> source)
+            public override void Run(IObservable<TSource> source)
             {
-                var subscription = source.SubscribeSafe(this);
+                Disposable.SetSingle(ref _sourceDisposable, source.SubscribeSafe(this));
+            }
 
-                SetUpstream(Disposable.Create(() =>
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
                 {
-                    try
-                    {
-                        subscription.Dispose();
-                    }
-                    finally
+                    if (Disposable.TryDispose(ref _sourceDisposable))
                     {
                         _finallyAction();
                     }
-                }));
+                }
+                base.Dispose(disposing);
             }
         }
     }

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupBy.cs

@@ -54,7 +54,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
             }
 
-            public void Run(IObservable<TSource> source)
+            public override void Run(IObservable<TSource> source)
             {
                 var sourceSubscription = new SingleAssignmentDisposable();
                 _refCountDisposable = new RefCountDisposable(sourceSubscription);

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs

@@ -58,7 +58,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _durationSelector = parent._durationSelector;
             }
 
-            public void Run(IObservable<TSource> source)
+            public override void Run(IObservable<TSource> source)
             {
                 _groupDisposable.Add(source.SubscribeSafe(this));
 

+ 13 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Multicast.cs

@@ -26,6 +26,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TResult>
         {
+            private IDisposable _connection;
+
             public _(IObserver<TResult> observer)
                 : base(observer)
             {
@@ -37,7 +39,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 var connectable = default(IConnectableObservable<TIntermediate>);
                 try
                 {
-                    var subject =parent._subjectSelector();
+                    var subject = parent._subjectSelector();
                     connectable = new ConnectableObservable<TSource, TIntermediate>(parent._source, subject);
                     observable = parent._selector(connectable);
                 }
@@ -47,10 +49,17 @@ namespace System.Reactive.Linq.ObservableImpl
                     return;
                 }
 
-                var subscription = observable.SubscribeSafe(this);
-                var connection = connectable.Connect();
+                base.Run(observable);
+                Disposable.SetSingle(ref _connection, connectable.Connect());
+            }
 
-                SetUpstream(StableCompositeDisposable.Create(subscription, connection));
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _connection);
+                }
+                base.Dispose(disposing);
             }
         }
     }

+ 29 - 21
Rx.NET/Source/src/System.Reactive/Linq/Observable/Sample.cs

@@ -31,8 +31,8 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            private IDisposable _sourceSubscription;
-            private IDisposable _samplerSubscription;
+            private IDisposable _sourceDisposable;
+            private IDisposable _samplerDisposable;
 
             private bool _hasValue;
             private TSource _value;
@@ -41,15 +41,19 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void Run(Sample<TSource, TSample> parent)
             {
-                var sourceSubscription = new SingleAssignmentDisposable();
-                _sourceSubscription = sourceSubscription;
-                sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
+                Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
 
-                var samplerSubscription = new SingleAssignmentDisposable();
-                _samplerSubscription = samplerSubscription;
-                samplerSubscription.Disposable = parent._sampler.SubscribeSafe(new SampleObserver(this));
+                Disposable.SetSingle(ref _samplerDisposable, parent._sampler.SubscribeSafe(new SampleObserver(this)));
+            }
 
-                SetUpstream(StableCompositeDisposable.Create(_sourceSubscription, _samplerSubscription));
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _sourceDisposable);
+                    Disposable.TryDispose(ref _samplerDisposable);
+                }
+                base.Dispose(disposing);
             }
 
             public override void OnNext(TSource value)
@@ -78,7 +82,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     if (_samplerAtEnd)
                         ForwardOnCompleted();
                     else
-                        _sourceSubscription.Dispose();
+                        Disposable.TryDispose(ref _sourceDisposable);
                 }
             }
 
@@ -134,7 +138,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             _parent.ForwardOnCompleted();
                         }
                         else
-                            _parent._samplerSubscription.Dispose();
+                            Disposable.TryDispose(ref _parent._samplerDisposable);
                     }
                 }
             }
@@ -167,7 +171,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            private IDisposable _sourceSubscription;
+            private IDisposable _sourceDisposable;
 
             private bool _hasValue;
             private TSource _value;
@@ -175,14 +179,18 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void Run(Sample<TSource> parent)
             {
-                var sourceSubscription = new SingleAssignmentDisposable();
-                _sourceSubscription = sourceSubscription;
-                sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
-
-                SetUpstream(StableCompositeDisposable.Create(
-                    sourceSubscription,
-                    parent._scheduler.SchedulePeriodic(parent._interval, Tick)
-                ));
+                Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
+
+                SetUpstream(parent._scheduler.SchedulePeriodic(parent._interval, Tick));
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _sourceDisposable);
+                }
+                base.Dispose(disposing);
             }
 
             private void Tick()
@@ -224,7 +232,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 lock (_gate)
                 {
                     _atEnd = true;
-                    _sourceSubscription.Dispose();
+                    Disposable.TryDispose(ref _sourceDisposable);
                 }
             }
         }

+ 48 - 12
Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs

@@ -48,7 +48,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private bool _isStopped;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _isStopped = false;
 
@@ -212,7 +212,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private bool _isStopped;
                 private int _index;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _isStopped = false;
 
@@ -554,11 +554,20 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private volatile int _count;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _count = 1;
 
-                    SetUpstream(StableCompositeDisposable.Create(source.SubscribeSafe(this), _cancel));
+                    base.Run(source);
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        _cancel.Dispose();
+                    }
+                    base.Dispose(disposing);
                 }
 
                 public override void OnNext(TSource value)
@@ -702,11 +711,20 @@ namespace System.Reactive.Linq.ObservableImpl
                 private volatile int _count;
                 private int _index;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _count = 1;
 
-                    SetUpstream(StableCompositeDisposable.Create(source.SubscribeSafe(this), _cancel));
+                    base.Run(source);
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        _cancel.Dispose();
+                    }
+                    base.Dispose(disposing);
                 }
 
                 public override void OnNext(TSource value)
@@ -853,7 +871,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private bool _isStopped;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _isStopped = false;
 
@@ -1088,7 +1106,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private bool _isStopped;
                 private int _index;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _isStopped = false;
 
@@ -1491,11 +1509,20 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private volatile int _count;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _count = 1;
 
-                    SetUpstream(StableCompositeDisposable.Create(source.SubscribeSafe(this), _cancel));
+                    base.Run(source);
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        _cancel.Dispose();
+                    }
+                    base.Dispose(disposing);
                 }
 
                 public override void OnNext(TSource value)
@@ -1612,11 +1639,20 @@ namespace System.Reactive.Linq.ObservableImpl
                 private volatile int _count;
                 private int _index;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _count = 1;
 
-                    SetUpstream(StableCompositeDisposable.Create(source.SubscribeSafe(this), _cancel));
+                    base.Run(source);
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        _cancel.Dispose();
+                    }
+                    base.Dispose(disposing);
                 }
 
                 public override void OnNext(TSource value)

+ 38 - 10
Rx.NET/Source/src/System.Reactive/Linq/Observable/SequenceEqual.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Reactive.Disposables;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -42,6 +43,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 private Queue<TSource> _ql;
                 private Queue<TSource> _qr;
 
+                private IDisposable _second;
+
                 public void Run(Observable parent)
                 {
                     _gate = new object();
@@ -50,11 +53,17 @@ namespace System.Reactive.Linq.ObservableImpl
                     _ql = new Queue<TSource>();
                     _qr = new Queue<TSource>();
 
-                    SetUpstream(StableCompositeDisposable.Create
-                    (
-                        parent._first.SubscribeSafe(new FirstObserver(this)),
-                        parent._second.SubscribeSafe(new SecondObserver(this))
-                    ));
+                    SetUpstream(parent._first.SubscribeSafe(new FirstObserver(this)));
+                    Disposable.SetSingle(ref _second, parent._second.SubscribeSafe(new SecondObserver(this)));
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _second);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private sealed class FirstObserver : IObserver<TSource>
@@ -232,6 +241,13 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private IEnumerator<TSource> _enumerator;
 
+                private static readonly IEnumerator<TSource> DisposedEnumerator = MakeDisposedEnumerator();
+
+                private static IEnumerator<TSource> MakeDisposedEnumerator()
+                {
+                    yield break;
+                }
+
                 public void Run(Enumerable parent)
                 {
                     //
@@ -243,7 +259,13 @@ namespace System.Reactive.Linq.ObservableImpl
                     //
                     try
                     {
-                        _enumerator = parent._second.GetEnumerator();
+                        var enumerator = parent._second.GetEnumerator();
+
+                        if (Interlocked.CompareExchange(ref _enumerator, enumerator, null) != null)
+                        {
+                            enumerator.Dispose();
+                            return;
+                        }
                     }
                     catch (Exception exception)
                     {
@@ -252,10 +274,16 @@ namespace System.Reactive.Linq.ObservableImpl
                         return;
                     }
 
-                    SetUpstream(StableCompositeDisposable.Create(
-                        parent._first.SubscribeSafe(this),
-                        _enumerator
-                    ));
+                    SetUpstream(parent._first.SubscribeSafe(this));
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Interlocked.Exchange(ref _enumerator, DisposedEnumerator)?.Dispose();
+                    }
+                    base.Dispose(disposing);
                 }
 
                 public override void OnNext(TSource value)

+ 13 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs

@@ -99,11 +99,21 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 }
 
+                private IDisposable _sourceDisposable;
+
                 public void Run(Time parent)
                 {
-                    var t = parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick());
-                    var d = parent._source.SubscribeSafe(this);
-                    SetUpstream(StableCompositeDisposable.Create(t, d));
+                    SetUpstream(parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick()));
+                    Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _sourceDisposable);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private IDisposable Tick()

+ 13 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

@@ -172,11 +172,21 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
+            private IDisposable _task;
+
             public void Run(SkipUntil<TSource> parent)
             {
-                var t = parent._scheduler.Schedule(this, parent._startTime, (_, state) => state.Tick());
-                var d = parent._source.SubscribeSafe(this);
-                SetUpstream(StableCompositeDisposable.Create(t, d));
+                Disposable.SetSingle(ref _task, parent._scheduler.Schedule(this, parent._startTime, (_, state) => state.Tick()));
+                base.Run(parent._source);
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _task);
+                }
+                base.Dispose(disposing);
             }
 
             private IDisposable Tick()

+ 14 - 9
Rx.NET/Source/src/System.Reactive/Linq/Observable/Switch.cs

@@ -28,24 +28,29 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            private IDisposable _subscription;
-            private SerialDisposable _innerSubscription;
+            private IDisposable _sourceDisposable;
+            private IDisposable _innerSerialDisposable;
             private bool _isStopped;
             private ulong _latest;
             private bool _hasLatest;
 
             public void Run(Switch<TSource> parent)
             {
-                _innerSubscription = new SerialDisposable();
                 _isStopped = false;
                 _latest = 0UL;
                 _hasLatest = false;
 
-                var subscription = new SingleAssignmentDisposable();
-                _subscription = subscription;
-                subscription.Disposable = parent._sources.SubscribeSafe(this);
+                Disposable.SetSingle(ref _sourceDisposable, parent._sources.SubscribeSafe(this));
+            }
 
-                SetUpstream(StableCompositeDisposable.Create(_subscription, _innerSubscription));
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _innerSerialDisposable);
+                    Disposable.TryDispose(ref _sourceDisposable);
+                }
+                base.Dispose(disposing);
             }
 
             public override void OnNext(IObservable<TSource> value)
@@ -58,7 +63,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
 
                 var d = new SingleAssignmentDisposable();
-                _innerSubscription.Disposable = d;
+                Disposable.TrySetSerial(ref _innerSerialDisposable, d);
                 d.Disposable = value.SubscribeSafe(new InnerObserver(this, id, d));
             }
 
@@ -72,7 +77,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 lock (_gate)
                 {
-                    _subscription.Dispose();
+                    Disposable.TryDispose(ref _sourceDisposable);
 
                     _isStopped = true;
                     if (!_hasLatest)

+ 13 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs

@@ -108,13 +108,23 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private object _gate;
 
+                private IDisposable _task;
+
                 public void Run(Time parent)
                 {
                     _gate = new object();
 
-                    var t = parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick());
-                    var d = parent._source.SubscribeSafe(this);
-                    SetUpstream(StableCompositeDisposable.Create(t, d));
+                    Disposable.SetSingle(ref _task, parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick()));
+                    base.Run(parent._source);
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _task);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private IDisposable Tick()

+ 29 - 20
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs

@@ -41,17 +41,22 @@ namespace System.Reactive.Linq.ObservableImpl
                     _queue = new Queue<TSource>();
                 }
 
-                private SingleAssignmentDisposable _subscription;
-                private SingleAssignmentDisposable _loop;
+                private IDisposable _sourceDisposable;
+                private IDisposable _loopDisposable;
 
                 public void Run()
                 {
-                    _subscription = new SingleAssignmentDisposable();
-                    _loop = new SingleAssignmentDisposable();
-
-                    _subscription.Disposable = _parent._source.SubscribeSafe(this);
+                    Disposable.SetSingle(ref _sourceDisposable, _parent._source.SubscribeSafe(this));
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(_subscription, _loop));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _loopDisposable);
+                        Disposable.TryDispose(ref _sourceDisposable);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 public override void OnNext(TSource value)
@@ -63,13 +68,13 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnCompleted()
                 {
-                    _subscription.Dispose();
+                    Disposable.TryDispose(ref _sourceDisposable);
 
                     var longRunning = _parent._loopScheduler.AsLongRunning();
                     if (longRunning != null)
-                        _loop.Disposable = longRunning.ScheduleLongRunning(Loop);
+                        Disposable.SetSingle(ref _loopDisposable, longRunning.ScheduleLongRunning(Loop));
                     else
-                        _loop.Disposable = _parent._loopScheduler.Schedule(LoopRec);
+                        Disposable.SetSingle(ref _loopDisposable, _parent._loopScheduler.Schedule(LoopRec));
                 }
 
                 private void LoopRec(Action recurse)
@@ -140,19 +145,23 @@ namespace System.Reactive.Linq.ObservableImpl
                     _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
                 }
 
-                private SingleAssignmentDisposable _subscription;
-                private SingleAssignmentDisposable _loop;
+                private IDisposable _sourceDisposable;
+                private IDisposable _loopDisposable;
                 private IStopwatch _watch;
 
                 public void Run()
                 {
-                    _subscription = new SingleAssignmentDisposable();
-                    _loop = new SingleAssignmentDisposable();
-
                     _watch = _parent._scheduler.StartStopwatch();
-                    _subscription.Disposable = _parent._source.SubscribeSafe(this);
+                    Disposable.SetSingle(ref _sourceDisposable, _parent._source.SubscribeSafe(this));
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(_subscription, _loop));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _sourceDisposable);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 public override void OnNext(TSource value)
@@ -164,16 +173,16 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnCompleted()
                 {
-                    _subscription.Dispose();
+                    Disposable.TryDispose(ref _sourceDisposable);
 
                     var now = _watch.Elapsed;
                     Trim(now);
 
                     var longRunning = _parent._loopScheduler.AsLongRunning();
                     if (longRunning != null)
-                        _loop.Disposable = longRunning.ScheduleLongRunning(Loop);
+                        Disposable.SetSingle(ref _loopDisposable, longRunning.ScheduleLongRunning(Loop));
                     else
-                        _loop.Disposable = _parent._loopScheduler.Schedule(LoopRec);
+                        Disposable.SetSingle(ref _loopDisposable, _parent._loopScheduler.Schedule(LoopRec));
                 }
 
                 private void LoopRec(Action recurse)

+ 13 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

@@ -147,11 +147,21 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
+            private IDisposable _sourceDisposable;
+
             public void Run(TakeUntil<TSource> parent)
             {
-                var t = parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick());
-                var d = parent._source.SubscribeSafe(this);
-                SetUpstream(StableCompositeDisposable.Create(t, d));
+                SetUpstream(parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick()));
+                Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _sourceDisposable);
+                }
+                base.Dispose(disposing);
             }
 
             private IDisposable Tick()

+ 27 - 15
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs

@@ -39,20 +39,26 @@ namespace System.Reactive.Linq.ObservableImpl
             private object _gate;
             private TSource _value;
             private bool _hasValue;
-            private SerialDisposable _cancelable;
+            private IDisposable _serialCancelable;
             private ulong _id;
 
-            public void Run(IObservable<TSource> source)
+            public override void Run(IObservable<TSource> source)
             {
                 _gate = new object();
                 _value = default(TSource);
                 _hasValue = false;
-                _cancelable = new SerialDisposable();
                 _id = 0UL;
 
-                var subscription = source.SubscribeSafe(this);
+                base.Run(source);
+            }
 
-                SetUpstream(StableCompositeDisposable.Create(subscription, _cancelable));
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _serialCancelable);
+                }
+                base.Dispose(disposing);
             }
 
             public override void OnNext(TSource value)
@@ -66,7 +72,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     currentid = _id;
                 }
                 var d = new SingleAssignmentDisposable();
-                _cancelable.Disposable = d;
+                Disposable.TrySetSerial(ref _serialCancelable, d);
                 d.Disposable = _scheduler.Schedule(currentid, _dueTime, Propagate);
             }
 
@@ -84,7 +90,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public override void OnError(Exception error)
             {
-                _cancelable.Dispose();
+                Disposable.TryDispose(ref _serialCancelable);
 
                 lock (_gate)
                 {
@@ -97,7 +103,7 @@ namespace System.Reactive.Linq.ObservableImpl
             
             public override void OnCompleted()
             {
-                _cancelable.Dispose();
+                Disposable.TryDispose(ref _serialCancelable);
 
                 lock (_gate)
                 {
@@ -141,7 +147,7 @@ namespace System.Reactive.Linq.ObservableImpl
             private object _gate;
             private TSource _value;
             private bool _hasValue;
-            private SerialDisposable _cancelable;
+            private IDisposable _serialCancelable;
             private ulong _id;
 
             public void Run(Throttle<TSource, TThrottle> parent)
@@ -149,12 +155,18 @@ namespace System.Reactive.Linq.ObservableImpl
                 _gate = new object();
                 _value = default(TSource);
                 _hasValue = false;
-                _cancelable = new SerialDisposable();
                 _id = 0UL;
 
-                var subscription = parent._source.SubscribeSafe(this);
+                base.Run(parent._source);
+            }
 
-                SetUpstream(StableCompositeDisposable.Create(subscription, _cancelable));
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _serialCancelable);
+                }
+                base.Dispose(disposing);
             }
 
             public override void OnNext(TSource value)
@@ -184,13 +196,13 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
 
                 var d = new SingleAssignmentDisposable();
-                _cancelable.Disposable = d;
+                Disposable.TrySetSerial(ref _serialCancelable, d);
                 d.Disposable = throttle.SubscribeSafe(new ThrottleObserver(this, value, currentid, d));
             }
 
             public override void OnError(Exception error)
             {
-                _cancelable.Dispose();
+                Disposable.TryDispose(ref _serialCancelable);
 
                 lock (_gate)
                 {
@@ -203,7 +215,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public override void OnCompleted()
             {
-                _cancelable.Dispose();
+                Disposable.TryDispose(ref _serialCancelable);
 
                 lock (_gate)
                 {

+ 28 - 13
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

@@ -49,7 +49,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     _scheduler = parent._scheduler;
                 }
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     CreateTimer(0L);
 
@@ -152,7 +152,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly IObservable<TSource> _other;
 
                 private readonly object _gate = new object();
-                private readonly SerialDisposable _subscription = new SerialDisposable();
+                private IDisposable _serialDisposable;
 
                 public _(IObservable<TSource> other, IObserver<TSource> observer)
                     : base(observer)
@@ -166,15 +166,22 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     var original = new SingleAssignmentDisposable();
 
-                    _subscription.Disposable = original;
+                    _serialDisposable = original;
 
                     _switched = false;
 
-                    var timer = parent._scheduler.Schedule(this, parent._dueTime, (_, state) => state.Timeout());
+                    SetUpstream(parent._scheduler.Schedule(this, parent._dueTime, (_, state) => state.Timeout()));
 
                     original.Disposable = parent._source.SubscribeSafe(this);
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(_subscription, timer));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _serialDisposable);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private IDisposable Timeout()
@@ -188,7 +195,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
 
                     if (timerWins)
-                        _subscription.Disposable = _other.SubscribeSafe(GetForwarder());
+                        Disposable.TrySetSerial(ref _serialDisposable, _other.SubscribeSafe(GetForwarder()));
 
                     return Disposable.Empty;
                 }
@@ -262,8 +269,8 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly IObservable<TSource> _other;
 
             private readonly object _gate = new object();
-            private readonly SerialDisposable _subscription = new SerialDisposable();
-            private readonly SerialDisposable _timer = new SerialDisposable();
+            private IDisposable _sourceDisposable;
+            private IDisposable _timerDisposable;
 
             public _(Timeout<TSource, TTimeout> parent, IObserver<TSource> observer)
                 : base(observer)
@@ -279,7 +286,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 var original = new SingleAssignmentDisposable();
 
-                _subscription.Disposable = original;
+                _sourceDisposable = original;
 
                 _id = 0UL;
                 _switched = false;
@@ -287,8 +294,16 @@ namespace System.Reactive.Linq.ObservableImpl
                 SetTimer(parent._firstTimeout);
 
                 original.Disposable = parent._source.SubscribeSafe(this);
+            }
 
-                SetUpstream(StableCompositeDisposable.Create(_subscription, _timer));
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _sourceDisposable);
+                    Disposable.TryDispose(ref _timerDisposable);
+                }
+                base.Dispose(disposing);
             }
 
             public override void OnNext(TSource value)
@@ -333,7 +348,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 var myid = _id;
 
                 var d = new SingleAssignmentDisposable();
-                _timer.Disposable = d;
+                Disposable.TrySetSerial(ref _timerDisposable, d);
                 d.Disposable = timeout.SubscribeSafe(new TimeoutObserver(this, myid, d));
             }
 
@@ -353,7 +368,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 public void OnNext(TTimeout value)
                 {
                     if (TimerWins())
-                        _parent._subscription.Disposable = _parent._other.SubscribeSafe(_parent.GetForwarder());
+                        Disposable.TrySetSerial(ref _parent._sourceDisposable,  _parent._other.SubscribeSafe(_parent.GetForwarder()));
 
                     _self.Dispose();
                 }
@@ -369,7 +384,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 public void OnCompleted()
                 {
                     if (TimerWins())
-                        _parent._subscription.Disposable = _parent._other.SubscribeSafe(_parent.GetForwarder());
+                        Disposable.TrySetSerial(ref _parent._sourceDisposable, _parent._other.SubscribeSafe(_parent.GetForwarder()));
                 }
 
                 private bool TimerWins()

+ 14 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Using.cs

@@ -29,25 +29,35 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
+            private IDisposable _disposable;
+
             public void Run(Using<TSource, TResource> parent)
             {
                 var source = default(IObservable<TSource>);
-                var disposable = Disposable.Empty;
                 try
                 {
                     var resource = parent._resourceFactory();
                     if (resource != null)
-                        disposable = resource;
+                        Disposable.SetSingle(ref _disposable, resource);
                     source = parent._observableFactory(resource);
                 }
                 catch (Exception exception)
                 {
-                    SetUpstream(StableCompositeDisposable.Create(Observable.Throw<TSource>(exception).SubscribeSafe(this), disposable));
+                    SetUpstream(Observable.Throw<TSource>(exception).SubscribeSafe(this));
 
                     return;
                 }
 
-                SetUpstream(StableCompositeDisposable.Create(source.SubscribeSafe(this), disposable));
+                base.Run(source);
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _disposable);
+                }
+                base.Dispose(disposing);
             }
         }
     }

+ 3 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs

@@ -48,7 +48,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private int _n;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _n = 0;
 
@@ -392,7 +392,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private RefCountDisposable _refCountDisposable;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _n = 0;
 
@@ -518,7 +518,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private ISubject<TSource> _window;
                 private RefCountDisposable _refCountDisposable;
 
-                public void Run(IObservable<TSource> source)
+                public override void Run(IObservable<TSource> source)
                 {
                     _window = new Subject<TSource>();
 

+ 15 - 10
Rx.NET/Source/src/System.Reactive/Linq/Observable/WithLatestFrom.cs

@@ -39,20 +39,27 @@ namespace System.Reactive.Linq.ObservableImpl
 
             private object _latestGate;
 
+            private IDisposable _secondDisposable;
+
             public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
             {
                 _gate = new object();
                 _latestGate = new object();
 
-                var sndSubscription = new SingleAssignmentDisposable();
-
                 var fstO = new FirstObserver(this);
-                var sndO = new SecondObserver(this, sndSubscription);
+                var sndO = new SecondObserver(this);
 
-                sndSubscription.Disposable = second.SubscribeSafe(sndO);
-                var fstSubscription = first.SubscribeSafe(fstO);
+                Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(sndO));
+                SetUpstream(first.SubscribeSafe(fstO));
+            }
 
-                SetUpstream(StableCompositeDisposable.Create(fstSubscription, sndSubscription));
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _secondDisposable);
+                }
+                base.Dispose(disposing);
             }
 
             private sealed class FirstObserver : IObserver<TFirst>
@@ -119,17 +126,15 @@ namespace System.Reactive.Linq.ObservableImpl
             private sealed class SecondObserver : IObserver<TSecond>
             {
                 private readonly _ _parent;
-                private readonly IDisposable _self;
 
-                public SecondObserver(_ parent, IDisposable self)
+                public SecondObserver(_ parent)
                 {
                     _parent = parent;
-                    _self = self;
                 }
 
                 public void OnCompleted()
                 {
-                    _self.Dispose();
+                    Disposable.TryDispose(ref _parent._secondDisposable);
                 }
 
                 public void OnError(Exception error)

+ 23 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs

@@ -6,6 +6,7 @@ using System.Collections;
 using System.Collections.Generic;
 using System.Linq;
 using System.Reactive.Disposables;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -262,6 +263,13 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private IEnumerator<TSecond> _rightEnumerator;
 
+                private static readonly IEnumerator<TSecond> DisposedEnumerator = MakeDisposedEnumerator();
+
+                private static IEnumerator<TSecond> MakeDisposedEnumerator()
+                {
+                    yield break;
+                }
+
                 public void Run(IObservable<TFirst> first, IEnumerable<TSecond> second)
                 {
                     //
@@ -273,7 +281,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     //
                     try
                     {
-                        _rightEnumerator = second.GetEnumerator();
+                        var enumerator = second.GetEnumerator();
+                        if (Interlocked.CompareExchange(ref _rightEnumerator, enumerator, null) != null)
+                        {
+                            enumerator.Dispose();
+                            return;
+                        }
                     }
                     catch (Exception exception)
                     {
@@ -282,9 +295,16 @@ namespace System.Reactive.Linq.ObservableImpl
                         return;
                     }
 
-                    var leftSubscription = first.SubscribeSafe(this);
+                    base.Run(first);
+                }
 
-                    SetUpstream(StableCompositeDisposable.Create(leftSubscription, _rightEnumerator));
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
+                    }
+                    base.Dispose(disposing);
                 }
 
                 public override void OnNext(TFirst value)