Bläddra i källkod

Merge pull request #772 from danielcweber/RevertRefCountEager

Revert changes to RefCount.
Daniel C. Weber 7 år sedan
förälder
incheckning
c76f29c2f2

+ 18 - 32
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -14,6 +14,7 @@ namespace System.Reactive.Linq.ObservableImpl
         internal sealed class Eager : Producer<TSource, Eager._>
         {
             private readonly IConnectableObservable<TSource> _source;
+
             private readonly object _gate;
             private int _count;
             private IDisposable _connectableSubscription;
@@ -22,60 +23,45 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 _source = source;
                 _gate = new object();
+                _count = 0;
+                _connectableSubscription = default(IDisposable);
             }
 
-            protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
+            protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
 
-            protected override void Run(_ sink) => sink.Run();
+            protected override void Run(_ sink) => sink.Run(this);
 
             internal sealed class _ : IdentitySink<TSource>
             {
-                private readonly Eager _parent;
-
-                public _(IObserver<TSource> observer, Eager parent)
+                public _(IObserver<TSource> observer)
                     : base(observer)
                 {
-                    _parent = parent;
                 }
 
-                public void Run()
+                public void Run(Eager parent)
                 {
-                    Run(_parent._source);
+                    var subscription = parent._source.SubscribeSafe(this);
 
-                    lock (_parent._gate)
+                    lock (parent._gate)
                     {
-                        if (++_parent._count == 1)
+                        if (++parent._count == 1)
                         {
-                            // We need to set _connectableSubscription to something
-                            // before Connect because if Connect terminates synchronously,
-                            // Dispose(bool) gets executed and will try to dispose
-                            // _connectableSubscription of null.
-                            // ?.Dispose() is no good because the dispose action has to be
-                            // executed anyway.
-                            // We can't inline SAD either because the IDisposable of Connect
-                            // may belong to the wrong connection.
-                            var sad = new SingleAssignmentDisposable();
-                            _parent._connectableSubscription = sad;
-
-                            sad.Disposable = _parent._source.Connect();
+                            parent._connectableSubscription = parent._source.Connect();
                         }
                     }
-                }
-
-                protected override void Dispose(bool disposing)
-                {
-                    base.Dispose(disposing);
 
-                    if (disposing)
+                    SetUpstream(Disposable.Create(() =>
                     {
-                        lock (_parent._gate)
+                        subscription.Dispose();
+
+                        lock (parent._gate)
                         {
-                            if (--_parent._count == 0)
+                            if (--parent._count == 0)
                             {
-                                _parent._connectableSubscription.Dispose();
+                                parent._connectableSubscription.Dispose();
                             }
                         }
-                    }
+                    }));
                 }
             }
         }

+ 60 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs

@@ -3,8 +3,11 @@
 // See the LICENSE file in the project root for more information. 
 
 using System;
+using System.Reactive;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 using System.Reactive.Linq;
+using System.Reactive.Subjects;
 using Microsoft.Reactive.Testing;
 using Xunit;
 
@@ -12,6 +15,25 @@ namespace ReactiveTests.Tests
 {
     public class RefCountTest : ReactiveTest
     {
+        private sealed class DematerializingConnectableObservable<T> : IConnectableObservable<T>
+        {
+            private readonly IConnectableObservable<Notification<T>> _subject;
+
+            public DematerializingConnectableObservable(IConnectableObservable<Notification<T>> subject)
+            {
+                _subject = subject;
+            }
+
+            public IDisposable Subscribe(IObserver<T> observer)
+            {
+                return _subject.Dematerialize().Subscribe(observer);
+            }
+
+            public IDisposable Connect()
+            {
+                return _subject.Connect();
+            }
+        }
 
         [Fact]
         public void RefCount_ArgumentChecking()
@@ -177,6 +199,44 @@ namespace ReactiveTests.Tests
             );
         }
 
+        [Fact]
+        public void RefCount_can_connect_again_if_previous_subscription_terminated_synchronously()
+        {
+            var seen = 0;
+            var terminated = false;
+
+            var subject = new ReplaySubject<Notification<int>>(1);
+            var connectable = new DematerializingConnectableObservable<int>(subject.Publish());
+            var refCount = connectable.RefCount();
+
+            subject.OnNext(Notification.CreateOnNext(36));
+
+            using (refCount.Subscribe(value => seen = value, () => terminated = true))
+            {
+                Assert.Equal(36, seen);
+            }
+
+            seen = 0;
+            terminated = false;
+            subject.OnNext(Notification.CreateOnCompleted<int>());
+
+            using (refCount.Subscribe(value => seen = value, () => terminated = true))
+            {
+                Assert.Equal(0, seen);
+                Assert.True(terminated);
+            }
+
+            seen = 0;
+            terminated = false;
+            subject.OnNext(Notification.CreateOnNext(36));
+
+            using (refCount.Subscribe(value => seen = value, () => terminated = true))
+            {
+                Assert.Equal(36, seen);
+                Assert.False(terminated);
+            }
+        }
+        
         [Fact]
         public void LazyRefCount_ArgumentChecking()
         {