فهرست منبع

Rework SelectMany (#604)

Daniel C. Weber 7 سال پیش
والد
کامیت
ad99fb5dfb

+ 5 - 0
Rx.NET/Source/src/System.Reactive/Internal/Sink.cs

@@ -56,6 +56,11 @@ namespace System.Reactive
         {
             Disposable.SetSingle(ref _upstream, upstream);
         }
+
+        protected void DisposeUpstream()
+        {
+            Disposable.TryDispose(ref _upstream);
+        }
     }
 
     /// <summary>

+ 59 - 74
Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs

@@ -856,28 +856,15 @@ namespace System.Reactive.Linq.ObservableImpl
             internal class _ : Sink<TSource, TResult> 
             {
                 protected readonly object _gate = new object();
-                private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
-                private readonly CompositeDisposable _group = new CompositeDisposable();
-
                 private readonly Func<TSource, IObservable<TResult>> _selector;
+                private readonly CompositeDisposable _group = new CompositeDisposable();
+                
+                private bool _isStopped;
 
                 public _(ObservableSelector parent, IObserver<TResult> observer)
                     : base(observer)
                 {
                     _selector = parent._selector;
-
-                    _group.Add(_sourceSubscription);
-                }
-
-                private bool _isStopped;
-
-                public override void Run(IObservable<TSource> source)
-                {
-                    _isStopped = false;
-
-                    _sourceSubscription.Disposable = source.SubscribeSafe(this);
-
-                    SetUpstream(_group);
                 }
 
                 public override void OnNext(TSource value)
@@ -913,14 +900,22 @@ namespace System.Reactive.Linq.ObservableImpl
                     Final();
                 }
 
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+
+                    if (disposing)
+                        _group.Dispose();
+                }
+
                 protected void Final()
                 {
                     _isStopped = true;
-                    if (_group.Count == 1)
+                    if (_group.Count == 0)
                     {
                         //
                         // Notice there can be a race between OnCompleted of the source and any
-                        // of the inner sequences, where both see _group.Count == 1, and one is
+                        // of the inner sequences, where both see _group.Count == 0, and one is
                         // waiting for the lock. There won't be a double OnCompleted observation
                         // though, because the call to Dispose silences the observer by swapping
                         // in a NopObserver<T>.
@@ -932,35 +927,34 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        _sourceSubscription.Dispose();
+                        DisposeUpstream();
                     }
                 }
 
                 protected void SubscribeInner(IObservable<TResult> inner)
                 {
-                    var innerSubscription = new SingleAssignmentDisposable();
-                    _group.Add(innerSubscription);
-                    innerSubscription.Disposable = inner.SubscribeSafe(new InnerObserver(this, innerSubscription));
+                    var innerObserver = new InnerObserver(this);
+
+                    _group.Add(innerObserver);
+                    innerObserver.SetResource(inner.SubscribeSafe(innerObserver));
                 }
 
-                private sealed class InnerObserver : IObserver<TResult>
+                private sealed class InnerObserver : SafeObserver<TResult>
                 {
                     private readonly _ _parent;
-                    private readonly IDisposable _self;
 
-                    public InnerObserver(_ parent, IDisposable self)
+                    public InnerObserver(_ parent)
                     {
                         _parent = parent;
-                        _self = self;
                     }
 
-                    public void OnNext(TResult value)
+                    public override void OnNext(TResult value)
                     {
                         lock (_parent._gate)
                             _parent.ForwardOnNext(value);
                     }
 
-                    public void OnError(Exception error)
+                    public override void OnError(Exception error)
                     {
                         lock (_parent._gate)
                         {
@@ -968,10 +962,10 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                     }
 
-                    public void OnCompleted()
+                    public override void OnCompleted()
                     {
-                        _parent._group.Remove(_self);
-                        if (_parent._isStopped && _parent._group.Count == 1)
+                        _parent._group.Remove(this);
+                        if (_parent._isStopped && _parent._group.Count == 0)
                         {
                             //
                             // Notice there can be a race between OnCompleted of the source and any
@@ -1090,29 +1084,17 @@ namespace System.Reactive.Linq.ObservableImpl
             internal class _ : Sink<TSource, TResult> 
             {
                 private readonly object _gate = new object();
-                private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
                 private readonly CompositeDisposable _group = new CompositeDisposable();
 
                 protected readonly Func<TSource, int, IObservable<TResult>> _selector;
 
+                private int _index;
+                private bool _isStopped;
+
                 public _(ObservableSelectorIndexed parent, IObserver<TResult> observer)
                     : base(observer)
                 {
                     _selector = parent._selector;
-
-                    _group.Add(_sourceSubscription);
-                }
-
-                private bool _isStopped;
-                private int _index;
-
-                public override void Run(IObservable<TSource> source)
-                {
-                    _isStopped = false;
-
-                    _sourceSubscription.Disposable = source.SubscribeSafe(this);
-
-                    SetUpstream(_group);
                 }
 
                 public override void OnNext(TSource value)
@@ -1148,10 +1130,18 @@ namespace System.Reactive.Linq.ObservableImpl
                     Final();
                 }
 
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+
+                    if (disposing)
+                        _group.Dispose();
+                }
+
                 protected void Final()
                 {
                     _isStopped = true;
-                    if (_group.Count == 1)
+                    if (_group.Count == 0)
                     {
                         //
                         // Notice there can be a race between OnCompleted of the source and any
@@ -1167,35 +1157,34 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        _sourceSubscription.Dispose();
+                        DisposeUpstream();
                     }
                 }
 
                 protected void SubscribeInner(IObservable<TResult> inner)
                 {
-                    var innerSubscription = new SingleAssignmentDisposable();
-                    _group.Add(innerSubscription);
-                    innerSubscription.Disposable = inner.SubscribeSafe(new InnerObserver(this, innerSubscription));
+                    var innerObserver = new InnerObserver(this);
+
+                    _group.Add(innerObserver);
+                    innerObserver.SetResource(inner.SubscribeSafe(innerObserver));
                 }
 
-                private sealed class InnerObserver : IObserver<TResult>
+                private sealed class InnerObserver : SafeObserver<TResult>
                 {
                     private readonly _ _parent;
-                    private readonly IDisposable _self;
 
-                    public InnerObserver(_ parent, IDisposable self)
+                    public InnerObserver(_ parent)
                     {
                         _parent = parent;
-                        _self = self;
                     }
 
-                    public void OnNext(TResult value)
+                    public override void OnNext(TResult value)
                     {
                         lock (_parent._gate)
                             _parent.ForwardOnNext(value);
                     }
 
-                    public void OnError(Exception error)
+                    public override void OnError(Exception error)
                     {
                         lock (_parent._gate)
                         {
@@ -1203,10 +1192,10 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                     }
 
-                    public void OnCompleted()
+                    public override void OnCompleted()
                     {
-                        _parent._group.Remove(_self);
-                        if (_parent._isStopped && _parent._group.Count == 1)
+                        _parent._group.Remove(this);
+                        if (_parent._isStopped && _parent._group.Count == 0)
                         {
                             //
                             // Notice there can be a race between OnCompleted of the source and any
@@ -1242,8 +1231,6 @@ namespace System.Reactive.Linq.ObservableImpl
             new internal sealed class _ : ObservableSelectorIndexed._
             {
                 private readonly object _gate = new object();
-                private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
-                private readonly CompositeDisposable _group = new CompositeDisposable();
 
                 private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
                 private readonly Func<IObservable<TResult>> _selectorOnCompleted;
@@ -1253,8 +1240,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _selectorOnError = parent._selectorOnError;
                     _selectorOnCompleted = parent._selectorOnCompleted;
-
-                    _group.Add(_sourceSubscription);
                 }
 
                 public override void OnError(Exception error)
@@ -1497,7 +1482,7 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : Sink<TSource, TResult> 
             {
                 private readonly object _gate = new object();
-                private readonly CancellationDisposable _cancel = new CancellationDisposable();
+                private readonly CancellationTokenSource _cts = new CancellationTokenSource();
 
                 private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
 
@@ -1520,7 +1505,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     if (disposing)
                     {
-                        _cancel.Dispose();
+                        _cts.Cancel();
                     }
                     base.Dispose(disposing);
                 }
@@ -1531,7 +1516,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     try
                     {
                         Interlocked.Increment(ref _count);
-                        task = _selector(value, _cancel.Token);
+                        task = _selector(value, _cts.Token);
                     }
                     catch (Exception ex)
                     {
@@ -1549,7 +1534,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        task.ContinueWith(OnCompletedTask);
+                        task.ContinueWith((closureTask, @thisObject) => ((_)@thisObject).OnCompletedTask(closureTask), this);
                     }
                 }
 
@@ -1575,7 +1560,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             break;
                         case TaskStatus.Canceled:
                             {
-                                if (!_cancel.IsDisposed)
+                                if (!_cts.IsCancellationRequested)
                                 {
                                     lock (_gate)
                                     {
@@ -1626,7 +1611,7 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : Sink<TSource, TResult> 
             {
                 private readonly object _gate = new object();
-                private readonly CancellationDisposable _cancel = new CancellationDisposable();
+                private readonly CancellationTokenSource _cts = new CancellationTokenSource();
 
                 private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
 
@@ -1650,7 +1635,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     if (disposing)
                     {
-                        _cancel.Dispose();
+                        _cts.Cancel();
                     }
                     base.Dispose(disposing);
                 }
@@ -1661,7 +1646,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     try
                     {
                         Interlocked.Increment(ref _count);
-                        task = _selector(value, checked(_index++), _cancel.Token);
+                        task = _selector(value, checked(_index++), _cts.Token);
                     }
                     catch (Exception ex)
                     {
@@ -1679,7 +1664,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        task.ContinueWith(OnCompletedTask);
+                        task.ContinueWith((closureTask, @thisObject) => ((_)@thisObject).OnCompletedTask(closureTask), this);
                     }
                 }
 
@@ -1705,7 +1690,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             break;
                         case TaskStatus.Canceled:
                             {
-                                if (!_cancel.IsDisposed)
+                                if (!_cts.IsCancellationRequested)
                                 {
                                     lock (_gate)
                                     {