瀏覽代碼

4.x: Improve RefCount() internals

akarnokd 7 年之前
父節點
當前提交
4893eb807b

+ 5 - 0
Rx.NET/Source/src/System.Reactive/Internal/Sink.cs

@@ -30,6 +30,11 @@ namespace System.Reactive
                 Dispose(true);
         }
 
+        /// <summary>
+        /// Override this method to dispose additional resources.
+        /// The method is guaranteed to be called at most once.
+        /// </summary>
+        /// <param name="disposing">If true, the method was called from <see cref="Dispose()"/>.</param>
         protected virtual void Dispose(bool disposing)
         {
             //Calling base.Dispose(true) is not a proper disposal, so we can omit the assignment here.

+ 76 - 20
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -16,54 +16,110 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly IConnectableObservable<TSource> _source;
 
             private readonly object _gate;
-            private int _count;
-            private IDisposable _connectableSubscription;
+            /// <summary>
+            /// Contains the current active connection's state or null
+            /// if no connection is active at the moment.
+            /// Should be manipulated while holding the <see cref="_gate"/> lock.
+            /// </summary>
+            private RefConnection _connection;
 
             public Eager(IConnectableObservable<TSource> source)
             {
                 _source = source;
                 _gate = new object();
-                _count = 0;
-                _connectableSubscription = default(IDisposable);
             }
 
-            protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
+            protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
 
-            protected override void Run(_ sink) => sink.Run(this);
+            protected override void Run(_ sink) => sink.Run();
 
             internal sealed class _ : IdentitySink<TSource>
             {
-                public _(IObserver<TSource> observer)
+                private readonly Eager _parent;
+                /// <summary>
+                /// Contains the connection reference the downstream observer
+                /// has subscribed to. Its purpose is to
+                /// avoid subscribing, connecting and disconnecting
+                /// while holding a lock.
+                /// </summary>
+                private RefConnection _targetConnection;
+
+                public _(IObserver<TSource> observer, Eager parent)
                     : base(observer)
                 {
+                    _parent = parent;
                 }
 
-                public void Run(Eager parent)
+                public void Run()
                 {
-                    var subscription = parent._source.SubscribeSafe(this);
+                    var doConnect = false;
+                    var conn = default(RefConnection);
 
-                    lock (parent._gate)
+                    lock (_parent._gate)
                     {
-                        if (++parent._count == 1)
+                        // get the active connection state
+                        conn = _parent._connection;
+                        // if null, a new connection should be established
+                        if (conn == null)
                         {
-                            parent._connectableSubscription = parent._source.Connect();
+                            conn = new RefConnection();
+                            // make it the active one
+                            _parent._connection = conn;
                         }
+
+                        // this is the first observer, then connect
+                        doConnect = conn._count++ == 0;
+                        // save the current connection for this observer
+                        _targetConnection = conn;
                     }
 
-                    SetUpstream(Disposable.Create(() =>
+                    // subscribe to the source first
+                    Run(_parent._source);
+                    // then connect the source if necessary
+                    if (doConnect)
                     {
-                        subscription.Dispose();
+                        // this makes sure if the connection ends synchronously
+                        // only the currently known connection is affected
+                        // and a connection from a concurrent reconnection won't
+                        // interfere
+                        Disposable.SetSingle(ref conn._disposable, _parent._source.Connect());
+                    }
+                }
 
-                        lock (parent._gate)
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+                    // get and forget the saved connection
+                    var targetConnection = _targetConnection;
+                    _targetConnection = null;
+
+                    lock (_parent._gate) {
+                        // if the current connection is no longer the saved connection
+                        // or the counter hasn't reached zero yet
+                        if (targetConnection != _parent._connection
+                            || --targetConnection._count != 0)
                         {
-                            if (--parent._count == 0)
-                            {
-                                parent._connectableSubscription.Dispose();
-                            }
+                            // nothing to do.
+                            return;
                         }
-                    }));
+                        // forget the current connection
+                        _parent._connection = null;
+                    }
+
+                    // disconnect
+                    Disposable.TryDispose(ref targetConnection._disposable);
                 }
             }
+
+            /// <summary>
+            /// Holds an individual connection state: the observer count and
+            /// the connection's IDisposable.
+            /// </summary>
+            private sealed class RefConnection
+            {
+                internal int _count;
+                internal IDisposable _disposable;
+            }
         }
 
         internal sealed class Lazy : Producer<TSource, Lazy._>