Bläddra i källkod

Merge pull request #774 from akarnokd/RefCountImprovements

4.x: Improve RefCount() internals
Daniel C. Weber 7 år sedan
förälder
incheckning
b37fb94a38

+ 5 - 0
Rx.NET/Source/src/System.Reactive/Internal/Sink.cs

@@ -30,6 +30,11 @@ namespace System.Reactive
                 Dispose(true);
         }
 
+        /// <summary>
+        /// Override this method to dispose additional resources.
+        /// The method is guaranteed to be called at most once.
+        /// </summary>
+        /// <param name="disposing">If true, the method was called from <see cref="Dispose()"/>.</param>
         protected virtual void Dispose(bool disposing)
         {
             //Calling base.Dispose(true) is not a proper disposal, so we can omit the assignment here.

+ 78 - 18
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -16,54 +16,114 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly IConnectableObservable<TSource> _source;
 
             private readonly object _gate;
-            private int _count;
-            private IDisposable _connectableSubscription;
+            /// <summary>
+            /// Contains the current active connection's state or null
+            /// if no connection is active at the moment.
+            /// Should be manipulated while holding the <see cref="_gate"/> lock.
+            /// </summary>
+            private RefConnection _connection;
 
             public Eager(IConnectableObservable<TSource> source)
             {
                 _source = source;
                 _gate = new object();
-                _count = 0;
-                _connectableSubscription = default(IDisposable);
             }
 
-            protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
+            protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
 
-            protected override void Run(_ sink) => sink.Run(this);
+            protected override void Run(_ sink) => sink.Run();
 
             internal sealed class _ : IdentitySink<TSource>
             {
-                public _(IObserver<TSource> observer)
+                private readonly Eager _parent;
+                /// <summary>
+                /// Contains the connection reference the downstream observer
+                /// has subscribed to. Its purpose is to
+                /// avoid subscribing, connecting and disconnecting
+                /// while holding a lock.
+                /// </summary>
+                private RefConnection _targetConnection;
+
+                public _(IObserver<TSource> observer, Eager parent)
                     : base(observer)
                 {
+                    _parent = parent;
                 }
 
-                public void Run(Eager parent)
+                public void Run()
                 {
-                    var subscription = parent._source.SubscribeSafe(this);
+                    var doConnect = false;
+                    var conn = default(RefConnection);
 
-                    lock (parent._gate)
+                    lock (_parent._gate)
                     {
-                        if (++parent._count == 1)
+                        // get the active connection state
+                        conn = _parent._connection;
+                        // if null, a new connection should be established
+                        if (conn == null)
                         {
-                            parent._connectableSubscription = parent._source.Connect();
+                            conn = new RefConnection();
+                            // make it the active one
+                            _parent._connection = conn;
                         }
+
+                        // this is the first observer, then connect
+                        doConnect = conn._count++ == 0;
+                        // save the current connection for this observer
+                        _targetConnection = conn;
                     }
 
-                    SetUpstream(Disposable.Create(() =>
+                    // subscribe to the source first
+                    Run(_parent._source);
+                    // then connect the source if necessary
+                    if (doConnect && !Disposable.GetIsDisposed(ref conn._disposable))
                     {
-                        subscription.Dispose();
+                        // this makes sure if the connection ends synchronously
+                        // only the currently known connection is affected
+                        // and a connection from a concurrent reconnection won't
+                        // interfere
+                        Disposable.SetSingle(ref conn._disposable, _parent._source.Connect());
+                    }
+                }
 
-                        lock (parent._gate)
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+                    if (disposing)
+                    {
+                        // get and forget the saved connection
+                        var targetConnection = _targetConnection;
+                        _targetConnection = null;
+
+                        lock (_parent._gate)
                         {
-                            if (--parent._count == 0)
+                            // if the current connection is no longer the saved connection
+                            // or the counter hasn't reached zero yet
+                            if (targetConnection != _parent._connection
+                                || --targetConnection._count != 0)
                             {
-                                parent._connectableSubscription.Dispose();
+                                // nothing to do.
+                                return;
                             }
+                            // forget the current connection
+                            _parent._connection = null;
                         }
-                    }));
+
+                        // disconnect
+                        Disposable.TryDispose(ref targetConnection._disposable);
+                    }
                 }
             }
+
+            /// <summary>
+            /// Holds an individual connection state: the observer count and
+            /// the connection's IDisposable.
+            /// </summary>
+            private sealed class RefConnection
+            {
+                internal int _count;
+                internal IDisposable _disposable;
+            }
         }
 
         internal sealed class Lazy : Producer<TSource, Lazy._>

+ 25 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs

@@ -411,5 +411,30 @@ namespace ReactiveTests.Tests
                 Subscribe(285, 300)
             );
         }
+
+        [Fact]
+        public void RefCount_source_already_completed_synchronously()
+        {
+            var subscribed = 0;
+            var unsubscribed = 0;
+            
+            var o1 = Observable.Create<string>(observer =>
+            {
+                subscribed++;
+                observer.OnCompleted();
+
+                return Disposable.Create(() => unsubscribed++);
+            });
+
+            var o2 = o1.Publish().RefCount();
+
+            var s1 = o2.Subscribe();
+            Assert.Equal(1, subscribed);
+            Assert.Equal(1, unsubscribed);
+
+            var s2 = o2.Subscribe();
+            Assert.Equal(1, subscribed);
+            Assert.Equal(1, unsubscribed);
+        }
     }
 }