1
0
Эх сурвалжийг харах

Save some more allocations in SelectMany. (#640)

* Save more allocations in SelectMany.

* Mark _isStopped-fields as volatile.

* Save another allocation of a SingleAssignmentDisposable.

* Save the allocation of a SingleAssignmentDisposable, no need to override Run any more.
Daniel C. Weber 7 жил өмнө
parent
commit
24c3fbcca5

+ 44 - 56
Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs

@@ -31,7 +31,6 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : Sink<TSource, TResult> 
             {
                 private readonly object _gate = new object();
-                private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
                 private readonly CompositeDisposable _group = new CompositeDisposable();
 
                 private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
@@ -42,20 +41,9 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _collectionSelector = parent._collectionSelector;
                     _resultSelector = parent._resultSelector;
-
-                    _group.Add(_sourceSubscription);
                 }
 
-                private bool _isStopped;
-
-                public override void Run(IObservable<TSource> source)
-                {
-                    _isStopped = false;
-
-                    _sourceSubscription.Disposable = source.SubscribeSafe(this);
-
-                    SetUpstream(_group);
-                }
+                private volatile bool _isStopped;
 
                 public override void OnNext(TSource value)
                 {
@@ -74,9 +62,9 @@ namespace System.Reactive.Linq.ObservableImpl
                         return;
                     }
 
-                    var innerSubscription = new SingleAssignmentDisposable();
-                    _group.Add(innerSubscription);
-                    innerSubscription.Disposable = collection.SubscribeSafe(new InnerObserver(this, value, innerSubscription));
+                    var innerObserver = new InnerObserver(this, value);
+                    _group.Add(innerObserver);
+                    innerObserver.SetResource(collection.SubscribeSafe(innerObserver));
                 }
 
                 public override void OnError(Exception error)
@@ -90,7 +78,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 public override void OnCompleted()
                 {
                     _isStopped = true;
-                    if (_group.Count == 1)
+                    if (_group.Count == 0)
                     {
                         //
                         // Notice there can be a race between OnCompleted of the source and any
@@ -106,24 +94,30 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        _sourceSubscription.Dispose();
+                        DisposeUpstream();
                     }
                 }
 
-                private sealed class InnerObserver : IObserver<TCollection>
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+
+                    if (disposing)
+                        _group.Dispose();
+                }
+
+                private sealed class InnerObserver : SafeObserver<TCollection>
                 {
                     private readonly _ _parent;
                     private readonly TSource _value;
-                    private readonly IDisposable _self;
 
-                    public InnerObserver(_ parent, TSource value, IDisposable self)
+                    public InnerObserver(_ parent, TSource value)
                     {
                         _parent = parent;
                         _value = value;
-                        _self = self;
                     }
 
-                    public void OnNext(TCollection value)
+                    public override void OnNext(TCollection value)
                     {
                         var res = default(TResult);
 
@@ -144,7 +138,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             _parent.ForwardOnNext(res);
                     }
 
-                    public void OnError(Exception error)
+                    public override void OnError(Exception error)
                     {
                         lock (_parent._gate)
                         {
@@ -152,10 +146,10 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                     }
 
-                    public void OnCompleted()
+                    public override void OnCompleted()
                     {
-                        _parent._group.Remove(_self);
-                        if (_parent._isStopped && _parent._group.Count == 1)
+                        _parent._group.Remove(this);
+                        if (_parent._isStopped && _parent._group.Count == 0)
                         {
                             //
                             // Notice there can be a race between OnCompleted of the source and any
@@ -194,7 +188,6 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : Sink<TSource, TResult> 
             {
                 private readonly object _gate = new object();
-                private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
                 private readonly CompositeDisposable _group = new CompositeDisposable();
 
                 private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelector;
@@ -205,22 +198,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _collectionSelector = parent._collectionSelector;
                     _resultSelector = parent._resultSelector;
-
-                    _group.Add(_sourceSubscription);
                 }
 
-                private bool _isStopped;
+                private volatile bool _isStopped;
                 private int _index;
 
-                public override void Run(IObservable<TSource> source)
-                {
-                    _isStopped = false;
-
-                    _sourceSubscription.Disposable = source.SubscribeSafe(this);
-
-                    SetUpstream(_group);
-                }
-
                 public override void OnNext(TSource value)
                 {
                     var index = checked(_index++);
@@ -239,9 +221,9 @@ namespace System.Reactive.Linq.ObservableImpl
                         return;
                     }
 
-                    var innerSubscription = new SingleAssignmentDisposable();
-                    _group.Add(innerSubscription);
-                    innerSubscription.Disposable = collection.SubscribeSafe(new InnerObserver(this, value, index, innerSubscription));
+                    var innerObserver = new InnerObserver(this, value, index);
+                    _group.Add(innerObserver);
+                    innerObserver.SetResource(collection.SubscribeSafe(innerObserver));
                 }
 
                 public override void OnError(Exception error)
@@ -255,7 +237,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 public override void OnCompleted()
                 {
                     _isStopped = true;
-                    if (_group.Count == 1)
+                    if (_group.Count == 0)
                     {
                         //
                         // Notice there can be a race between OnCompleted of the source and any
@@ -271,28 +253,34 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        _sourceSubscription.Dispose();
+                        DisposeUpstream();
                     }
                 }
 
-                private sealed class InnerObserver : IObserver<TCollection>
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+
+                    if (disposing)
+                        _group.Dispose();
+                }
+
+                private sealed class InnerObserver : SafeObserver<TCollection>
                 {
                     private readonly _ _parent;
                     private readonly TSource _value;
                     private readonly int _valueIndex;
-                    private readonly IDisposable _self;
 
-                    public InnerObserver(_ parent, TSource value, int index, IDisposable self)
+                    public InnerObserver(_ parent, TSource value, int index)
                     {
                         _parent = parent;
                         _value = value;
                         _valueIndex = index;
-                        _self = self;
                     }
 
                     private int _index;
 
-                    public void OnNext(TCollection value)
+                    public override void OnNext(TCollection value)
                     {
                         var res = default(TResult);
 
@@ -313,7 +301,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             _parent.ForwardOnNext(res);
                     }
 
-                    public void OnError(Exception error)
+                    public override void OnError(Exception error)
                     {
                         lock (_parent._gate)
                         {
@@ -321,10 +309,10 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                     }
 
-                    public void OnCompleted()
+                    public override void OnCompleted()
                     {
-                        _parent._group.Remove(_self);
-                        if (_parent._isStopped && _parent._group.Count == 1)
+                        _parent._group.Remove(this);
+                        if (_parent._isStopped && _parent._group.Count == 0)
                         {
                             //
                             // Notice there can be a race between OnCompleted of the source and any
@@ -859,7 +847,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly Func<TSource, IObservable<TResult>> _selector;
                 private readonly CompositeDisposable _group = new CompositeDisposable();
                 
-                private bool _isStopped;
+                private volatile bool _isStopped;
 
                 public _(ObservableSelector parent, IObserver<TResult> observer)
                     : base(observer)
@@ -1089,7 +1077,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 protected readonly Func<TSource, int, IObservable<TResult>> _selector;
 
                 private int _index;
-                private bool _isStopped;
+                private volatile bool _isStopped;
 
                 public _(ObservableSelectorIndexed parent, IObserver<TResult> observer)
                     : base(observer)