Переглянути джерело

Review Merge, save some allocations. (#639)

* Save some usages of SingleAssignmentDisposables in Merge by using new Sink-features and the SafeObserver-class.

* Don't introduce a new Run-method, the base-implementation will handle it just fine.

* Save closure allocation and allow delegate caching.

* Mark _isStopped-fields as volatile.
Daniel C. Weber 7 роки тому
батько
коміт
c417815312
1 змінених файлів з 51 додано та 77 видалено
  1. 51 77
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs

+ 51 - 77
Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs

@@ -24,7 +24,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TSource> observer) => new _(_maxConcurrent, observer);
 
-            protected override void Run(_ sink) => sink.Run(this);
+            protected override void Run(_ sink) => sink.Run(_sources);
 
             internal sealed class _ : Sink<IObservable<TSource>, TSource> 
             {
@@ -36,28 +36,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     _maxConcurrent = maxConcurrent;
                 }
 
-                private object _gate;
-                private Queue<IObservable<TSource>> _q;
-                private bool _isStopped;
-                private SingleAssignmentDisposable _sourceSubscription;
-                private CompositeDisposable _group;
+                private object _gate = new object();
+                private Queue<IObservable<TSource>> _q = new Queue<IObservable<TSource>>();
+                private volatile bool _isStopped;
+                private CompositeDisposable _group = new CompositeDisposable();
                 private int _activeCount = 0;
 
-                public void Run(ObservablesMaxConcurrency parent)
-                {
-                    _gate = new object();
-                    _q = new Queue<IObservable<TSource>>();
-                    _isStopped = false;
-                    _activeCount = 0;
-
-                    _group = new CompositeDisposable();
-                    _sourceSubscription = new SingleAssignmentDisposable();
-                    _sourceSubscription.Disposable = parent._sources.SubscribeSafe(this);
-                    _group.Add(_sourceSubscription);
-
-                    SetUpstream(_group);
-                }
-
                 public override void OnNext(IObservable<TSource> value)
                 {
                     lock (_gate)
@@ -91,36 +75,42 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                         else
                         {
-                            _sourceSubscription.Dispose();
+                            DisposeUpstream();
                         }
                     }
                 }
 
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+
+                    if (disposing)
+                        _group.Dispose();
+                }
+
                 private void Subscribe(IObservable<TSource> innerSource)
                 {
-                    var subscription = new SingleAssignmentDisposable();
-                    _group.Add(subscription);
-                    subscription.Disposable = innerSource.SubscribeSafe(new InnerObserver(this, subscription));
+                    var innerObserver = new InnerObserver(this);
+                    _group.Add(innerObserver);
+                    innerObserver.SetResource(innerSource.SubscribeSafe(innerObserver));
                 }
 
-                private sealed class InnerObserver : IObserver<TSource>
+                private sealed class InnerObserver : SafeObserver<TSource>
                 {
                     private readonly _ _parent;
-                    private readonly IDisposable _self;
 
-                    public InnerObserver(_ parent, IDisposable self)
+                    public InnerObserver(_ parent)
                     {
                         _parent = parent;
-                        _self = self;
                     }
 
-                    public void OnNext(TSource value)
+                    public override void OnNext(TSource value)
                     {
                         lock (_parent._gate)
                             _parent.ForwardOnNext(value);
                     }
 
-                    public void OnError(Exception error)
+                    public override void OnError(Exception error)
                     {
                         lock (_parent._gate)
                         {
@@ -128,9 +118,9 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                     }
 
-                    public void OnCompleted()
+                    public override void OnCompleted()
                     {
-                        _parent._group.Remove(_self);
+                        _parent._group.Remove(this);
                         lock (_parent._gate)
                         {
                             if (_parent._q.Count > 0)
@@ -163,7 +153,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
 
-            protected override void Run(_ sink) => sink.Run(this);
+            protected override void Run(_ sink) => sink.Run(_sources);
 
             internal sealed class _ : Sink<IObservable<TSource>, TSource> 
             {
@@ -172,29 +162,15 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 }
 
-                private object _gate;
-                private bool _isStopped;
-                private CompositeDisposable _group;
-                private SingleAssignmentDisposable _sourceSubscription;
-
-                public void Run(Observables parent)
-                {
-                    _gate = new object();
-                    _isStopped = false;
-                    _group = new CompositeDisposable();
-
-                    _sourceSubscription = new SingleAssignmentDisposable();
-                    _group.Add(_sourceSubscription);
-                    _sourceSubscription.Disposable = parent._sources.SubscribeSafe(this);
-
-                    SetUpstream(_group);
-                }
+                private object _gate = new object();
+                private volatile bool _isStopped;
+                private CompositeDisposable _group = new CompositeDisposable();
 
                 public override void OnNext(IObservable<TSource> value)
                 {
-                    var innerSubscription = new SingleAssignmentDisposable();
-                    _group.Add(innerSubscription);
-                    innerSubscription.Disposable = value.SubscribeSafe(new InnerObserver(this, innerSubscription));
+                    var innerObserver = new InnerObserver(this);
+                    _group.Add(innerObserver);
+                    innerObserver.SetResource(value.SubscribeSafe(innerObserver));
                 }
 
                 public override void OnError(Exception error)
@@ -208,7 +184,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 public override void OnCompleted()
                 {
                     _isStopped = true;
-                    if (_group.Count == 1)
+                    if (_group.Count == 0)
                     {
                         //
                         // Notice there can be a race between OnCompleted of the source and any
@@ -224,28 +200,34 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        _sourceSubscription.Dispose();
+                        DisposeUpstream();
                     }
                 }
 
-                private sealed class InnerObserver : IObserver<TSource>
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+
+                    if (disposing)
+                        _group.Dispose();
+                }
+
+                private sealed class InnerObserver : SafeObserver<TSource>
                 {
                     private readonly _ _parent;
-                    private readonly IDisposable _self;
 
-                    public InnerObserver(_ parent, IDisposable self)
+                    public InnerObserver(_ parent)
                     {
                         _parent = parent;
-                        _self = self;
                     }
 
-                    public void OnNext(TSource value)
+                    public override void OnNext(TSource value)
                     {
                         lock (_parent._gate)
                             _parent.ForwardOnNext(value);
                     }
 
-                    public void OnError(Exception error)
+                    public override void OnError(Exception error)
                     {
                         lock (_parent._gate)
                         {
@@ -253,10 +235,10 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                     }
 
-                    public void OnCompleted()
+                    public override void OnCompleted()
                     {
-                        _parent._group.Remove(_self);
-                        if (_parent._isStopped && _parent._group.Count == 1)
+                        _parent._group.Remove(this);
+                        if (_parent._isStopped && _parent._group.Count == 0)
                         {
                             //
                             // Notice there can be a race between OnCompleted of the source and any
@@ -286,7 +268,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
 
-            protected override void Run(_ sink) => sink.Run(this);
+            protected override void Run(_ sink) => sink.Run(_sources);
 
             internal sealed class _ : Sink<Task<TSource>, TSource> 
             {
@@ -295,16 +277,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 }
 
-                private object _gate;
-                private volatile int _count;
-
-                public void Run(Tasks parent)
-                {
-                    _gate = new object();
-                    _count = 1;
-
-                    SetUpstream(parent._sources.SubscribeSafe(this));
-                }
+                private object _gate = new object();
+                private volatile int _count = 1;
 
                 public override void OnNext(Task<TSource> value)
                 {
@@ -315,7 +289,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        value.ContinueWith(OnCompletedTask);
+                        value.ContinueWith((t, @thisObject) => ((_)@thisObject).OnCompletedTask(t), this);
                     }
                 }