Pārlūkot izejas kodu

Inline Disposable.Create (#580)

David Karnok 7 gadi atpakaļ
vecāks
revīzija
6ecc899a4b

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs

@@ -170,7 +170,7 @@ namespace System.Reactive.Concurrency
                 EnsureThread();
             }
 
-            return Disposable.Create(si.Cancel);
+            return si;
         }
 
         /// <summary>

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs

@@ -148,7 +148,7 @@ namespace System.Reactive.Concurrency
             si = new ScheduledItem<DateTimeOffset, TState>(this, state, run, dueTime, Comparer);
             queue.Enqueue(si);
 
-            return Disposable.Create(si.Cancel);
+            return si;
         }
     }
 }

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.cs

@@ -420,7 +420,7 @@ namespace System.Reactive.Concurrency
                 queue.Enqueue(si);
             }
 
-            return Disposable.Create(si.Cancel);
+            return si;
         }
     }
 }

+ 16 - 1
Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

@@ -39,7 +39,22 @@ namespace System.Reactive
             if (_longRunning != null)
             {
                 _dispatcherEvent = new SemaphoreSlim(0);
-                _dispatcherEventRelease = Disposable.Create(() => _dispatcherEvent.Release());
+                _dispatcherEventRelease = new SemaphoreSlimRelease(_dispatcherEvent);
+            }
+        }
+
+        sealed class SemaphoreSlimRelease : IDisposable
+        {
+            SemaphoreSlim _dispatcherEvent;
+
+            public SemaphoreSlimRelease(SemaphoreSlim dispatcherEvent)
+            {
+                Volatile.Write(ref _dispatcherEvent, dispatcherEvent);
+            }
+
+            public void Dispose()
+            {
+                Interlocked.Exchange(ref _dispatcherEvent, null)?.Release();
             }
         }
 

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -318,7 +318,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 protected void ScheduleDrain()
                 {
                     _stop = new CancellationTokenSource();
-                    Disposable.TrySetSerial(ref _cancelable, Disposable.Create(_stop.Cancel));
+                    Disposable.TrySetSerial(ref _cancelable, new CancellationDisposable(_stop));
 
                     _scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
                 }

+ 16 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEventPattern.cs

@@ -5,6 +5,7 @@
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Reflection;
+using System.Threading;
 
 //
 // BREAKING CHANGE v2 > v1.x - FromEvent[Pattern] now has an implicit SubscribeOn and Publish operation.
@@ -116,17 +117,29 @@ namespace System.Reactive.Linq.ObservableImpl
                     throw tie.InnerException;
                 }
 
-                return Disposable.Create(() =>
+                return new RemoveHandlerDisposable(removeHandler);
+            }
+
+            sealed class RemoveHandlerDisposable : IDisposable
+            {
+                Action _removeHandler;
+
+                public RemoveHandlerDisposable(Action removeHandler)
+                {
+                    Volatile.Write(ref this._removeHandler, removeHandler);
+                }
+
+                public void Dispose()
                 {
                     try
                     {
-                        removeHandler();
+                        Interlocked.Exchange(ref _removeHandler, null)?.Invoke();
                     }
                     catch (TargetInvocationException tie)
                     {
                         throw tie.InnerException;
                     }
-                });
+                }
             }
 
             private Action AddHandlerCore(Delegate handler)

+ 32 - 15
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -23,41 +23,58 @@ namespace System.Reactive.Linq.ObservableImpl
             _connectableSubscription = default(IDisposable);
         }
 
-        protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
+        protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
 
-        protected override void Run(_ sink) => sink.Run(this);
+        protected override void Run(_ sink) => sink.Run();
 
         internal sealed class _ : IdentitySink<TSource>
         {
-            public _(IObserver<TSource> observer)
+            readonly RefCount<TSource> _parent;
+
+            public _(IObserver<TSource> observer, RefCount<TSource> parent)
                 : base(observer)
             {
+                this._parent = parent;
             }
 
-            public void Run(RefCount<TSource> parent)
+            public void Run()
             {
-                var subscription = parent._source.SubscribeSafe(this);
+                base.Run(_parent._source);
 
-                lock (parent._gate)
+                lock (_parent._gate)
                 {
-                    if (++parent._count == 1)
+                    if (++_parent._count == 1)
                     {
-                        parent._connectableSubscription = parent._source.Connect();
+                        // We need to set _connectableSubscription to something
+                        // before Connect because if Connect terminates synchronously,
+                        // Dispose(bool) gets executed and will try to dispose
+                        // _connectableSubscription of null.
+                        // ?.Dispose() is no good because the dispose action has to be
+                        // executed anyway.
+                        // We can't inline SAD either because the IDisposable of Connect
+                        // may belong to the wrong connection.
+                        var sad = new SingleAssignmentDisposable();
+                        _parent._connectableSubscription = sad;
+
+                        sad.Disposable = _parent._source.Connect();
                     }
                 }
+            }
 
-                SetUpstream(Disposable.Create(() =>
-                {
-                    subscription.Dispose();
+            protected override void Dispose(bool disposing)
+            {
+                base.Dispose(disposing);
 
-                    lock (parent._gate)
+                if (disposing)
+                {
+                    lock (_parent._gate)
                     {
-                        if (--parent._count == 0)
+                        if (--_parent._count == 0)
                         {
-                            parent._connectableSubscription.Dispose();
+                            _parent._connectableSubscription.Dispose();
                         }
                     }
-                }));
+                }
             }
         }
     }

+ 72 - 35
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs

@@ -35,44 +35,61 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly Func<TFirst, TSecond, TResult> _resultSelector;
 
+                private readonly object _gate;
+
+                private readonly FirstObserver _firstObserver;
+                private IDisposable _firstDisposable;
+
+                private readonly SecondObserver _secondObserver;
+                private IDisposable _secondDisposable;
+
                 public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
                     : base(observer)
                 {
+                    _gate = new object();
+
+                    _firstObserver = new FirstObserver(this);
+                    _secondObserver = new SecondObserver(this);
+
+                    _firstObserver.Other = _secondObserver;
+                    _secondObserver.Other = _firstObserver;
+
                     _resultSelector = resultSelector;
                 }
 
-                private object _gate;
-
                 public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
                 {
-                    _gate = new object();
-
-                    var fstSubscription = new SingleAssignmentDisposable();
-                    var sndSubscription = new SingleAssignmentDisposable();
-
-                    var fstO = new FirstObserver(this, fstSubscription);
-                    var sndO = new SecondObserver(this, sndSubscription);
-
-                    fstO.Other = sndO;
-                    sndO.Other = fstO;
+                    Disposable.SetSingle(ref _firstDisposable, first.SubscribeSafe(_firstObserver));
+                    Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(_secondObserver));
+                }
 
-                    fstSubscription.Disposable = first.SubscribeSafe(fstO);
-                    sndSubscription.Disposable = second.SubscribeSafe(sndO);
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _firstDisposable);
+                        Disposable.TryDispose(ref _secondDisposable);
 
-                    SetUpstream(StableCompositeDisposable.Create(fstSubscription, sndSubscription, fstO, sndO));
+                        // clearing the queue should happen under the lock
+                        // as they are plain Queue<T>s, not concurrent queues.
+                        lock (_gate)
+                        {
+                            _firstObserver.Dispose();
+                            _secondObserver.Dispose();
+                        }
+                    }
+                    base.Dispose(disposing);
                 }
 
                 private sealed class FirstObserver : IObserver<TFirst>, IDisposable
                 {
                     private readonly _ _parent;
-                    private readonly IDisposable _self;
                     private SecondObserver _other;
                     private Queue<TFirst> _queue;
 
-                    public FirstObserver(_ parent, IDisposable self)
+                    public FirstObserver(_ parent)
                     {
                         _parent = parent;
-                        _self = self;
                         _queue = new Queue<TFirst>();
                     }
 
@@ -136,7 +153,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             }
                             else
                             {
-                                _self.Dispose();
+                                Disposable.TryDispose(ref _parent._firstDisposable);
                             }
                         }
                     }
@@ -150,14 +167,12 @@ namespace System.Reactive.Linq.ObservableImpl
                 private sealed class SecondObserver : IObserver<TSecond>, IDisposable
                 {
                     private readonly _ _parent;
-                    private readonly IDisposable _self;
                     private FirstObserver _other;
                     private Queue<TSecond> _queue;
 
-                    public SecondObserver(_ parent, IDisposable self)
+                    public SecondObserver(_ parent)
                     {
                         _parent = parent;
-                        _self = self;
                         _queue = new Queue<TSecond>();
                     }
 
@@ -221,7 +236,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             }
                             else
                             {
-                                _self.Dispose();
+                                Disposable.TryDispose(ref _parent._secondDisposable);
                             }
                         }
                     }
@@ -533,17 +548,21 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             private readonly Zip<TSource> _parent;
 
+            private readonly object _gate;
+
             public _(Zip<TSource> parent, IObserver<IList<TSource>> observer)
                 : base(observer)
             {
+                _gate = new object();
                 _parent = parent;
             }
 
-            private object _gate;
             private Queue<TSource>[] _queues;
             private bool[] _isDone;
             private IDisposable[] _subscriptions;
 
+            private static readonly IDisposable[] Disposed = new IDisposable[0];
+
             public void Run()
             {
                 var srcs = _parent._sources.ToArray();
@@ -556,22 +575,40 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 _isDone = new bool[N];
 
-                _subscriptions = new SingleAssignmentDisposable[N];
+                var subscriptions = new IDisposable[N];
 
-                _gate = new object();
-
-                for (int i = 0; i < N; i++)
+                if (Interlocked.CompareExchange(ref _subscriptions, subscriptions, null) == null)
                 {
-                    var j = i;
+                    for (int i = 0; i < N; i++)
+                    {
+                        var o = new SourceObserver(this, i);
+                        Disposable.SetSingle(ref subscriptions[i], srcs[i].SubscribeSafe(o));
+                    }
+                }
+            }
 
-                    var d = new SingleAssignmentDisposable();
-                    _subscriptions[j] = d;
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    var subscriptions = Interlocked.Exchange(ref _subscriptions, Disposed);
+                    if (subscriptions != null)
+                    {
+                        for (int i = 0; i < subscriptions.Length; i++)
+                        {
+                            Disposable.TryDispose(ref subscriptions[i]);
+                        }
 
-                    var o = new SourceObserver(this, j);
-                    d.Disposable = srcs[j].SubscribeSafe(o);
+                        lock (_gate)
+                        {
+                            foreach (var q in _queues)
+                            {
+                                q.Clear();
+                            }
+                        }
+                    }
                 }
-
-                SetUpstream(new CompositeDisposable(_subscriptions) { Disposable.Create(() => { foreach (var q in _queues) q.Clear(); }) });
+                base.Dispose(disposing);
             }
 
             private void OnNext(int index, TSource value)