Explorar o código

Inline SerialDisposable usages (#593)

David Karnok %!s(int64=7) %!d(string=hai) anos
pai
achega
50bbdab3cb

+ 3 - 5
Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs

@@ -66,7 +66,7 @@ namespace System.Reactive.Concurrency
         /// <summary>
         /// Disposable that always holds the timer to dispatch the first element in the queue.
         /// </summary>
-        private readonly SerialDisposable _nextTimer;
+        private IDisposable _nextTimer;
 
         /// <summary>
         /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
@@ -109,8 +109,6 @@ namespace System.Reactive.Concurrency
             _queue = new SchedulerQueue<TimeSpan>();
             _readyList = new Queue<ScheduledItem<TimeSpan>>();
 
-            _nextTimer = new SerialDisposable();
-
             ExitIfEmpty = false;
         }
 
@@ -260,7 +258,7 @@ namespace System.Reactive.Concurrency
                 if (!_disposed)
                 {
                     _disposed = true;
-                    _nextTimer.Dispose();
+                    Disposable.TryDispose(ref _nextTimer);
                     _evt.Release();
                 }
             }
@@ -326,7 +324,7 @@ namespace System.Reactive.Concurrency
                             _nextItem = next;
 
                             var due = next.DueTime - _stopwatch.Elapsed;
-                            _nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due);
+                            Disposable.TrySetSerial(ref _nextTimer, ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due));
                         }
                     }
 

+ 5 - 5
Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

@@ -28,7 +28,7 @@ namespace System.Reactive
         private readonly IObserver<T> _observer;
         private readonly IScheduler _scheduler;
         private readonly ISchedulerLongRunning _longRunning;
-        private readonly SerialDisposable _disposable = new SerialDisposable();
+        private IDisposable _disposable;
 
         public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
         {
@@ -73,11 +73,11 @@ namespace System.Reactive
                     {
                         _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
 
-                        _disposable.Disposable = StableCompositeDisposable.Create
+                        Disposable.TrySetSerial(ref _disposable, StableCompositeDisposable.Create
                         (
                             _dispatcherJob,
                             _dispatcherEventRelease
-                        );
+                        ));
                     }
                 }
             }
@@ -192,7 +192,7 @@ namespace System.Reactive
 
             if (isOwner)
             {
-                _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
+                Disposable.TrySetSerial(ref _disposable, _scheduler.Schedule<object>(null, Run));
             }
         }
 
@@ -304,7 +304,7 @@ namespace System.Reactive
 
             if (disposing)
             {
-                _disposable.Dispose();
+                Disposable.TryDispose(ref _disposable);
             }
         }
     }

+ 5 - 6
Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs

@@ -39,7 +39,7 @@ namespace System.Reactive.PlatformServices
     public class PeriodicTimerSystemClockMonitor : INotifySystemClockChanged
     {
         private readonly TimeSpan _period;
-        private readonly SerialDisposable _timer;
+        private IDisposable _timer;
 
         /// <summary>
         /// Use the Unix milliseconds for the current time
@@ -60,7 +60,6 @@ namespace System.Reactive.PlatformServices
         public PeriodicTimerSystemClockMonitor(TimeSpan period)
         {
             _period = period;
-            _timer = new SerialDisposable();
         }
 
         /// <summary>
@@ -79,13 +78,13 @@ namespace System.Reactive.PlatformServices
             {
                 _systemClockChanged -= value;
 
-                _timer.Disposable = Disposable.Empty;
+                Disposable.TrySetSerial(ref _timer, Disposable.Empty);
             }
         }
 
         private void NewTimer()
         {
-            _timer.Disposable = Disposable.Empty;
+            Disposable.TrySetSerial(ref _timer, Disposable.Empty);
 
             var n = 0L;
             for (; ; )
@@ -93,13 +92,13 @@ namespace System.Reactive.PlatformServices
                 var now = SystemClock.UtcNow.ToUnixTimeMilliseconds();
                 Interlocked.Exchange(ref _lastTimeUnixMillis, now);
 
-                _timer.Disposable = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(TimeChanged, _period);
+                Disposable.TrySetSerial(ref _timer, ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(TimeChanged, _period));
 
                 if (Math.Abs(SystemClock.UtcNow.ToUnixTimeMilliseconds() - now) <= SYNC_MAXDELTA)
                 {
                     break;
                 }
-                if (_timer.Disposable == Disposable.Empty)
+                if (Volatile.Read(ref _timer) == Disposable.Empty)
                 {
                     break;
                 }

+ 16 - 36
Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Reactive.Disposables;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -89,22 +90,27 @@ namespace System.Reactive.Linq.ObservableImpl
                 _handler = handler;
             }
 
-            private SerialDisposable _subscription;
+            bool _once;
+
+            private IDisposable _subscription;
 
             public override void Run(IObservable<TSource> source)
             {
-                _subscription = new SerialDisposable();
-
-                var d1 = new SingleAssignmentDisposable();
-                _subscription.Disposable = d1;
-                d1.Disposable = source.SubscribeSafe(this);
+                Disposable.TrySetSingle(ref _subscription, source.SubscribeSafe(this));
+            }
 
-                SetUpstream(_subscription);
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref _subscription);
+                }
+                base.Dispose(disposing);
             }
 
             public override void OnError(Exception error)
             {
-                if (error is TException e)
+                if (!Volatile.Read(ref _once) && error is TException e)
                 {
                     var result = default(IObservable<TSource>);
                     try
@@ -117,40 +123,14 @@ namespace System.Reactive.Linq.ObservableImpl
                         return;
                     }
 
-                    var d = new SingleAssignmentDisposable();
-                    _subscription.Disposable = d;
-                    d.Disposable = result.SubscribeSafe(new HandlerObserver(this));
+                    Volatile.Write(ref _once, true);
+                    Disposable.TrySetSerial(ref _subscription, result.SubscribeSafe(this));
                 }
                 else
                 {
                     ForwardOnError(error);
                 }
             }
-
-            private sealed class HandlerObserver : IObserver<TSource>
-            {
-                private readonly _ _parent;
-
-                public HandlerObserver(_ parent)
-                {
-                    _parent = parent;
-                }
-
-                public void OnNext(TSource value)
-                {
-                    _parent.ForwardOnNext(value);
-                }
-
-                public void OnError(Exception error)
-                {
-                    _parent.ForwardOnError(error);
-                }
-
-                public void OnCompleted()
-                {
-                    _parent.ForwardOnCompleted();
-                }
-            }
         }
     }
 }

+ 18 - 9
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -802,29 +802,33 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 protected override IDisposable RunCore(SelectorWithSubscriptionDelay parent)
                 {
-                    var subscription = new SerialDisposable();
+                    var delayConsumer = new SubscriptionDelayObserver(this, parent._source);
 
-                    subscription.Disposable = parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelayObserver(this, parent._source, subscription));
+                    delayConsumer.SetFirst(parent._subscriptionDelay.SubscribeSafe(delayConsumer));
 
-                    return subscription;
+                    return delayConsumer;
                 }
 
-                private sealed class SubscriptionDelayObserver : IObserver<TDelay>
+                private sealed class SubscriptionDelayObserver : IObserver<TDelay>, IDisposable
                 {
                     private readonly _ _parent;
                     private readonly IObservable<TSource> _source;
-                    private readonly SerialDisposable _subscription;
+                    private IDisposable _subscription;
 
-                    public SubscriptionDelayObserver(_ parent, IObservable<TSource> source, SerialDisposable subscription)
+                    public SubscriptionDelayObserver(_ parent, IObservable<TSource> source)
                     {
                         _parent = parent;
                         _source = source;
-                        _subscription = subscription;
+                    }
+
+                    internal void SetFirst(IDisposable d)
+                    {
+                        Disposable.TrySetSingle(ref _subscription, d);
                     }
 
                     public void OnNext(TDelay value)
                     {
-                        _subscription.Disposable = _source.SubscribeSafe(_parent);
+                        Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
                     }
 
                     public void OnError(Exception error)
@@ -834,7 +838,12 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     public void OnCompleted()
                     {
-                        _subscription.Disposable = _source.SubscribeSafe(_parent);
+                        Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
+                    }
+
+                    public void Dispose()
+                    {
+                        Disposable.TryDispose(ref _subscription);
                     }
                 }
             }

+ 5 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -5,6 +5,7 @@
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -88,7 +89,7 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly IScheduler _scheduler;
             private readonly TimeSpan _disconnectTime;
             private readonly IConnectableObservable<TSource> _source;
-            private readonly SerialDisposable _serial = new SerialDisposable();
+            private IDisposable _serial;
 
             private int _count;
             private IDisposable _connectableSubscription;
@@ -125,7 +126,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             if (parent._connectableSubscription == null)
                                 parent._connectableSubscription = parent._source.Connect();
 
-                            parent._serial.Disposable = new SingleAssignmentDisposable();
+                            Disposable.TrySetSerial(ref parent._serial, new SingleAssignmentDisposable());
                         }
                     }
 
@@ -137,13 +138,13 @@ namespace System.Reactive.Linq.ObservableImpl
                         {
                             if (--parent._count == 0)
                             {
-                                var cancelable = (SingleAssignmentDisposable)parent._serial.Disposable;
+                                var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref parent._serial);
 
                                 cancelable.Disposable = parent._scheduler.Schedule(cancelable, parent._disconnectTime, (self, state) =>
                                 {
                                     lock (parent._gate)
                                     {
-                                        if (object.ReferenceEquals(parent._serial.Disposable, state))
+                                        if (object.ReferenceEquals(Volatile.Read(ref parent._serial), state))
                                         {
                                             parent._connectableSubscription.Dispose();
                                             parent._connectableSubscription = null;