浏览代码

Improve PushPullAdapter+Sink dispose management (#676)

* 4.x: Improve PushPullAdapter+Sink dispose management

* Clear _collector upon termination
David Karnok 7 年之前
父节点
当前提交
188f391329

+ 18 - 28
Rx.NET/Source/src/System.Reactive/Linq/Observable/Collect.cs

@@ -18,50 +18,38 @@ namespace System.Reactive.Linq.ObservableImpl
             _getNewCollector = getNewCollector;
         }
 
-        protected override PushToPullSink<TSource, TResult> Run(IDisposable subscription)
-        {
-            var sink = new _(this, subscription);
-            sink.Initialize();
-            return sink;
-        }
+        protected override PushToPullSink<TSource, TResult> Run() => new _(_merge, _getNewCollector, _getInitialCollector());
 
         private sealed class _ : PushToPullSink<TSource, TResult>
         {
-            // CONSIDER: This sink has a parent reference that can be considered for removal.
+            readonly object _gate;
+            readonly Func<TResult, TSource, TResult> _merge;
+            readonly Func<TResult, TResult> _getNewCollector;
 
-            private readonly Collect<TSource, TResult> _parent;
-
-            public _(Collect<TSource, TResult> parent, IDisposable subscription)
-                : base(subscription)
+            public _(Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector, TResult collector)
             {
-                _parent = parent;
+                _gate = new object();
+                _merge = merge;
+                _getNewCollector = getNewCollector;
+                _collector = collector;
             }
 
-            private object _gate;
             private TResult _collector;
-            private bool _hasFailed;
             private Exception _error;
             private bool _hasCompleted;
             private bool _done;
 
-            public void Initialize()
-            {
-                _gate = new object();
-                _collector = _parent._getInitialCollector();
-            }
-
             public override void OnNext(TSource value)
             {
                 lock (_gate)
                 {
                     try
                     {
-                        _collector = _parent._merge(_collector, value);
+                        _collector = _merge(_collector, value);
                     }
                     catch (Exception ex)
                     {
                         _error = ex;
-                        _hasFailed = true;
 
                         Dispose();
                     }
@@ -75,7 +63,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 lock (_gate)
                 {
                     _error = error;
-                    _hasFailed = true;
                 }
             }
 
@@ -93,10 +80,12 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 lock (_gate)
                 {
-                    if (_hasFailed)
+                    var error = _error;
+                    if (error != null)
                     {
-                        current = default(TResult);
-                        _error.Throw();
+                        current = default;
+                        _collector = default;
+                        error.Throw();
                     }
                     else
                     {
@@ -104,7 +93,8 @@ namespace System.Reactive.Linq.ObservableImpl
                         {
                             if (_done)
                             {
-                                current = default(TResult);
+                                current = default;
+                                _collector = default;
                                 return false;
                             }
 
@@ -117,7 +107,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                             try
                             {
-                                _collector = _parent._getNewCollector(current);
+                                _collector = _getNewCollector(current);
                             }
                             catch
                             {

+ 3 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Latest.cs

@@ -13,9 +13,9 @@ namespace System.Reactive.Linq.ObservableImpl
         {
         }
 
-        protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
+        protected override PushToPullSink<TSource, TSource> Run()
         {
-            return new _(subscription);
+            return new _();
         }
 
         private sealed class _ : PushToPullSink<TSource, TSource>
@@ -23,8 +23,7 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly object _gate;
             private readonly SemaphoreSlim _semaphore;
 
-            public _(IDisposable subscription)
-                : base(subscription)
+            public _()
             {
                 _gate = new object();
                 _semaphore = new SemaphoreSlim(0, 1);

+ 3 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/MostRecent.cs

@@ -14,15 +14,14 @@ namespace System.Reactive.Linq.ObservableImpl
             _initialValue = initialValue;
         }
 
-        protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
+        protected override PushToPullSink<TSource, TSource> Run()
         {
-            return new _(_initialValue, subscription);
+            return new _(_initialValue);
         }
 
         private sealed class _ : PushToPullSink<TSource, TSource>
         {
-            public _(TSource initialValue, IDisposable subscription)
-                : base(subscription)
+            public _(TSource initialValue)
             {
                 _kind = NotificationKind.OnNext;
                 _value = initialValue;

+ 3 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Next.cs

@@ -13,9 +13,9 @@ namespace System.Reactive.Linq.ObservableImpl
         {
         }
 
-        protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
+        protected override PushToPullSink<TSource, TSource> Run()
         {
-            return new _(subscription);
+            return new _();
         }
 
         private sealed class _ : PushToPullSink<TSource, TSource>
@@ -23,8 +23,7 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly object _gate;
             private readonly SemaphoreSlim _semaphore;
 
-            public _(IDisposable subscription)
-                : base(subscription)
+            public _()
             {
                 _gate = new object();
                 _semaphore = new SemaphoreSlim(0, 1);

+ 11 - 12
Rx.NET/Source/src/System.Reactive/Linq/Observable/PushToPullAdapter.cs

@@ -21,23 +21,17 @@ namespace System.Reactive.Linq.ObservableImpl
 
         public IEnumerator<TResult> GetEnumerator()
         {
-            var d = new SingleAssignmentDisposable();
-            var res = Run(d);
-            d.Disposable = _source.SubscribeSafe(res);
+            var res = Run();
+            res.SetUpstream(_source.SubscribeSafe(res));
             return res;
         }
 
-        protected abstract PushToPullSink<TSource, TResult> Run(IDisposable subscription);
+        protected abstract PushToPullSink<TSource, TResult> Run();
     }
 
     internal abstract class PushToPullSink<TSource, TResult> : IObserver<TSource>, IEnumerator<TResult>, IDisposable
     {
-        private readonly IDisposable _subscription;
-
-        public PushToPullSink(IDisposable subscription)
-        {
-            _subscription = subscription;
-        }
+        private IDisposable _upstream;
 
         public abstract void OnNext(TSource value);
         public abstract void OnError(Exception error);
@@ -59,7 +53,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 else
                 {
                     _done = true;
-                    _subscription.Dispose();
+                    Dispose();
                 }
             }
 
@@ -81,7 +75,12 @@ namespace System.Reactive.Linq.ObservableImpl
 
         public void Dispose()
         {
-            _subscription.Dispose();
+            Disposable.TryDispose(ref _upstream);
+        }
+
+        public void SetUpstream(IDisposable d)
+        {
+            Disposable.SetSingle(ref _upstream, d);
         }
     }
 }