瀏覽代碼

Review Throttle, save some allocations. (#638)

* Don't override Run, the base methode will handle it.

* Save closure allocation and allow delegate caching.

* Save the allocation of a SingleAssignmentDisposable in OnNext. It's purpose is only to dispose of the currently scheduled timer early. We can do that with another call to TrySetSerial, passing null.

* Don't override Run, the base implementation has it covered.

* Let ThrottleObserver inherit from SafeObserver so it can hold onto its own subscription, saving the allocation of a SingleAssignmentDisposable.
Daniel C. Weber 7 年之前
父節點
當前提交
c9bf3f1519
共有 1 個文件被更改,包括 21 次插入39 次删除
  1. 21 39
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs

+ 21 - 39
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs

@@ -36,28 +36,19 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = parent._scheduler;
             }
 
-            private object _gate;
+            private readonly object _gate = new object();
             private TSource _value;
             private bool _hasValue;
             private IDisposable _serialCancelable;
             private ulong _id;
 
-            public override void Run(IObservable<TSource> source)
-            {
-                _gate = new object();
-                _value = default(TSource);
-                _hasValue = false;
-                _id = 0UL;
-
-                base.Run(source);
-            }
-
             protected override void Dispose(bool disposing)
             {
                 if (disposing)
                 {
                     Disposable.TryDispose(ref _serialCancelable);
                 }
+
                 base.Dispose(disposing);
             }
 
@@ -71,12 +62,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     _id = unchecked(_id + 1);
                     currentid = _id;
                 }
-                var d = new SingleAssignmentDisposable();
-                Disposable.TrySetSerial(ref _serialCancelable, d);
-                d.Disposable = _scheduler.Schedule(currentid, _dueTime, Propagate);
+
+                Disposable.TrySetSerial(ref _serialCancelable, null);
+                Disposable.TrySetSerial(ref _serialCancelable, _scheduler.Schedule((@this: this, currentid), _dueTime, (_, tuple) => [email protected](tuple.currentid)));
             }
 
-            private IDisposable Propagate(IScheduler self, ulong currentid)
+            private IDisposable Propagate(ulong currentid)
             {
                 lock (_gate)
                 {
@@ -132,7 +123,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
         protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
 
-        protected override void Run(_ sink) => sink.Run(this);
+        protected override void Run(_ sink) => sink.Run(_source);
 
         internal sealed class _ : IdentitySink<TSource>
         {
@@ -144,22 +135,12 @@ namespace System.Reactive.Linq.ObservableImpl
                 _throttleSelector = parent._throttleSelector;
             }
 
-            private object _gate;
+            private readonly object _gate = new object();
             private TSource _value;
             private bool _hasValue;
             private IDisposable _serialCancelable;
             private ulong _id;
 
-            public void Run(Throttle<TSource, TThrottle> parent)
-            {
-                _gate = new object();
-                _value = default(TSource);
-                _hasValue = false;
-                _id = 0UL;
-
-                base.Run(parent._source);
-            }
-
             protected override void Dispose(bool disposing)
             {
                 if (disposing)
@@ -195,9 +176,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     currentid = _id;
                 }
 
-                var d = new SingleAssignmentDisposable();
-                Disposable.TrySetSerial(ref _serialCancelable, d);
-                d.Disposable = throttle.SubscribeSafe(new ThrottleObserver(this, value, currentid, d));
+                Disposable.TrySetSerial(ref _serialCancelable, null);
+
+                var newInnerObserver = new ThrottleObserver(this, value, currentid);
+                newInnerObserver.SetResource(throttle.SubscribeSafe(newInnerObserver));
+
+                Disposable.TrySetSerial(ref _serialCancelable, newInnerObserver);
             }
 
             public override void OnError(Exception error)
@@ -229,22 +213,20 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
             }
 
-            private sealed class ThrottleObserver : IObserver<TThrottle>
+            private sealed class ThrottleObserver : SafeObserver<TThrottle>
             {
                 private readonly _ _parent;
                 private readonly TSource _value;
                 private readonly ulong _currentid;
-                private readonly IDisposable _self;
 
-                public ThrottleObserver(_ parent, TSource value, ulong currentid, IDisposable self)
+                public ThrottleObserver(_ parent, TSource value, ulong currentid)
                 {
                     _parent = parent;
                     _value = value;
                     _currentid = currentid;
-                    _self = self;
                 }
 
-                public void OnNext(TThrottle value)
+                public override void OnNext(TThrottle value)
                 {
                     lock (_parent._gate)
                     {
@@ -252,11 +234,11 @@ namespace System.Reactive.Linq.ObservableImpl
                             _parent.ForwardOnNext(_value);
 
                         _parent._hasValue = false;
-                        _self.Dispose();
+                        Dispose();
                     }
                 }
 
-                public void OnError(Exception error)
+                public override void OnError(Exception error)
                 {
                     lock (_parent._gate)
                     {
@@ -264,7 +246,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                 }
 
-                public void OnCompleted()
+                public override void OnCompleted()
                 {
                     lock (_parent._gate)
                     {
@@ -272,7 +254,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             _parent.ForwardOnNext(_value);
 
                         _parent._hasValue = false;
-                        _self.Dispose();
+                        Dispose();
                     }
                 }
             }