Browse Source

Added lazy RefCount operator for IObservables. Lazy RefCount connects like RefCount but may delay disconnection. This is useful whenever a lot of connect/disconnect cycles are expected within a short timespan but with a significant overhead in connecting/disconnecting. Some unit tests have been added. Lazy RefCount has been excluded from methods that must be present for Qbservable as well. I leave it up to others to decide what Lazy RefCount means for Qbservable and whether there should be an implementation. (#133)

Daniel C. Weber 7 years ago
parent
commit
bdb54133ab

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

@@ -307,6 +307,8 @@ namespace System.Reactive.Linq
         IConnectableObservable<TSource> PublishLast<TSource>(IObservable<TSource> source);
         IObservable<TResult> PublishLast<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);
         IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source);
+        IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay);
+        IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler schedulder);
         IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source);
         IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, IScheduler scheduler);
         IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);

+ 42 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs

@@ -203,6 +203,48 @@ namespace System.Reactive.Linq
             return s_impl.RefCount<TSource>(source);
         }
 
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, TimeSpan disconnectDelay)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+
+            if (disconnectDelay < TimeSpan.Zero)
+                throw new ArgumentException("disconnectDelay");
+
+            return s_impl.RefCount<TSource>(source, disconnectDelay);
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
+        /// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler scheduler)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            if (disconnectDelay < TimeSpan.Zero)
+                throw new ArgumentException("disconnectDelay");
+
+            return s_impl.RefCount<TSource>(source, disconnectDelay, scheduler);
+        }
+
         #endregion
 
         #region + AutoConnect +

+ 126 - 45
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -2,78 +2,159 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class RefCount<TSource> : Producer<TSource, RefCount<TSource>._>
+    internal static class RefCount<TSource>
     {
-        private readonly IConnectableObservable<TSource> _source;
-
-        private readonly object _gate;
-        private int _count;
-        private IDisposable _connectableSubscription;
-
-        public RefCount(IConnectableObservable<TSource> source)
+        internal sealed class Eager : Producer<TSource, Eager._>
         {
-            _source = source;
-            _gate = new object();
-            _count = 0;
-            _connectableSubscription = default(IDisposable);
-        }
+            private readonly IConnectableObservable<TSource> _source;
 
-        protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
-
-        protected override void Run(_ sink) => sink.Run();
-
-        internal sealed class _ : IdentitySink<TSource>
-        {
-            readonly RefCount<TSource> _parent;
+            private readonly object _gate;
+            private int _count;
+            private IDisposable _connectableSubscription;
 
-            public _(IObserver<TSource> observer, RefCount<TSource> parent)
-                : base(observer)
+            public Eager(IConnectableObservable<TSource> source)
             {
-                this._parent = parent;
+                _source = source;
+                _gate = new object();
+                _count = 0;
+                _connectableSubscription = default(IDisposable);
             }
 
-            public void Run()
+            protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
+
+            protected override void Run(_ sink) => sink.Run();
+
+            internal sealed class _ : IdentitySink<TSource>
             {
-                base.Run(_parent._source);
+                readonly Eager _parent;
+
+                public _(IObserver<TSource> observer, Eager parent)
+                    : base(observer)
+                {
+                    this._parent = parent;
+                }
 
-                lock (_parent._gate)
+                public void Run()
                 {
-                    if (++_parent._count == 1)
+                    base.Run(_parent._source);
+
+                    lock (_parent._gate)
                     {
-                        // We need to set _connectableSubscription to something
-                        // before Connect because if Connect terminates synchronously,
-                        // Dispose(bool) gets executed and will try to dispose
-                        // _connectableSubscription of null.
-                        // ?.Dispose() is no good because the dispose action has to be
-                        // executed anyway.
-                        // We can't inline SAD either because the IDisposable of Connect
-                        // may belong to the wrong connection.
-                        var sad = new SingleAssignmentDisposable();
-                        _parent._connectableSubscription = sad;
-
-                        sad.Disposable = _parent._source.Connect();
+                        if (++_parent._count == 1)
+                        {
+                            // We need to set _connectableSubscription to something
+                            // before Connect because if Connect terminates synchronously,
+                            // Dispose(bool) gets executed and will try to dispose
+                            // _connectableSubscription of null.
+                            // ?.Dispose() is no good because the dispose action has to be
+                            // executed anyway.
+                            // We can't inline SAD either because the IDisposable of Connect
+                            // may belong to the wrong connection.
+                            var sad = new SingleAssignmentDisposable();
+                            _parent._connectableSubscription = sad;
+
+                            sad.Disposable = _parent._source.Connect();
+                        }
+                    }
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+
+                    if (disposing)
+                    {
+                        lock (_parent._gate)
+                        {
+                            if (--_parent._count == 0)
+                            {
+                                _parent._connectableSubscription.Dispose();
+                            }
+                        }
                     }
                 }
             }
+        }
+
+        internal sealed class Lazy : Producer<TSource, Lazy._>
+        {
+            private readonly object _gate;
+            private readonly IScheduler _scheduler;
+            private readonly TimeSpan _disconnectTime;
+            private readonly IConnectableObservable<TSource> _source;
+            private readonly SerialDisposable _serial = new SerialDisposable();
+
+            private int _count;
+            private IDisposable _connectableSubscription;
 
-            protected override void Dispose(bool disposing)
+            public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
             {
-                base.Dispose(disposing);
+                _source = source;
+                _gate = new object();
+                _disconnectTime = disconnectTime;
+                _scheduler = scheduler;
+                _count = 0;
+                _connectableSubscription = default(IDisposable);
+            }
 
-                if (disposing)
+            protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
+
+            protected override void Run(_ sink) => sink.Run(this);
+
+            internal sealed class _ : IdentitySink<TSource>
+            {
+                public _(IObserver<TSource> observer)
+                    : base(observer)
                 {
-                    lock (_parent._gate)
+                }
+
+                public void Run(Lazy parent)
+                {
+                    var subscription = parent._source.SubscribeSafe(this);
+
+                    lock (parent._gate)
                     {
-                        if (--_parent._count == 0)
+                        if (++parent._count == 1)
                         {
-                            _parent._connectableSubscription.Dispose();
+                            if (parent._connectableSubscription == null)
+                                parent._connectableSubscription = parent._source.Connect();
+
+                            parent._serial.Disposable = new SingleAssignmentDisposable();
                         }
                     }
+
+                    SetUpstream(Disposable.Create(() =>
+                    {
+                        subscription.Dispose();
+
+                        lock (parent._gate)
+                        {
+                            if (--parent._count == 0)
+                            {
+                                var cancelable = (SingleAssignmentDisposable)parent._serial.Disposable;
+
+                                cancelable.Disposable = parent._scheduler.Schedule(cancelable, parent._disconnectTime, (self, state) =>
+                                {
+                                    lock (parent._gate)
+                                    {
+                                        if (object.ReferenceEquals(parent._serial.Disposable, state))
+                                        {
+                                            parent._connectableSubscription.Dispose();
+                                            parent._connectableSubscription = null;
+                                        }
+                                    }
+
+                                    return Disposable.Empty;
+                                });
+                            }
+                        }
+                    }));
                 }
             }
         }

+ 70 - 2
Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

@@ -1,5 +1,5 @@
-/*
- * WARNING: Auto-generated file (05/28/2018 22:20:18)
+/*
+ * WARNING: Auto-generated file (06/12/2018 13:00:48)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 
@@ -10667,6 +10667,74 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, TimeSpan disconnectDelay)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(TimeSpan))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
+                    Expression.Constant(disconnectDelay, typeof(TimeSpan))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
+        /// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler scheduler)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(TimeSpan), default(IScheduler))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
+                    Expression.Constant(disconnectDelay, typeof(TimeSpan)),
+                    Expression.Constant(scheduler, typeof(IScheduler))
+                )
+            );
+        }
+
         /// <summary>
         /// Generates an observable sequence that repeats the given element infinitely.
         /// </summary>

+ 11 - 1
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs

@@ -68,7 +68,17 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source)
         {
-            return new RefCount<TSource>(source);
+            return new RefCount<TSource>.Eager(source);
+        }
+
+        public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime)
+        {
+            return RefCount(source, disconnectTime, Scheduler.Default);
+        }
+
+        public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
+        {
+            return new RefCount<TSource>.Lazy(source, disconnectTime, scheduler);
         }
 
         #endregion

+ 4 - 0
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt

@@ -1246,6 +1246,8 @@ namespace System.Reactive.Linq
         public static System.IObservable<int> Range(int start, int count) { }
         public static System.IObservable<int> Range(int start, int count, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TSource> RefCount<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source) { }
+        public static System.IObservable<TSource> RefCount<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source, System.TimeSpan disconnectDelay) { }
+        public static System.IObservable<TSource> RefCount<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source, System.TimeSpan disconnectDelay, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TResult> Repeat<TResult>(TResult value) { }
         public static System.IObservable<TResult> Repeat<TResult>(TResult value, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount) { }
@@ -1963,6 +1965,8 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<int> Range(this System.Reactive.Linq.IQbservableProvider provider, int start, int count) { }
         public static System.Reactive.Linq.IQbservable<int> Range(this System.Reactive.Linq.IQbservableProvider provider, int start, int count, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TSource> RefCount<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source) { }
+        public static System.Reactive.Linq.IQbservable<TSource> RefCount<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source, System.TimeSpan disconnectDelay) { }
+        public static System.Reactive.Linq.IQbservable<TSource> RefCount<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source, System.TimeSpan disconnectDelay, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TResult> Repeat<TResult>(this System.Reactive.Linq.IQbservableProvider provider, TResult value) { }
         public static System.Reactive.Linq.IQbservable<TResult> Repeat<TResult>(this System.Reactive.Linq.IQbservableProvider provider, TResult value, int repeatCount) { }
         public static System.Reactive.Linq.IQbservable<TResult> Repeat<TResult>(this System.Reactive.Linq.IQbservableProvider provider, TResult value, int repeatCount, System.Reactive.Concurrency.IScheduler scheduler) { }

+ 174 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs

@@ -184,5 +184,179 @@ namespace ReactiveTests.Tests
             );
         }
 
+        [Fact]
+        public void LazyRefCount_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, TimeSpan.FromSeconds(2)));
+        }
+
+        [Fact]
+        public void LazyRefCount_ConnectsOnFirst()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable<int>(
+                OnNext(210, 1),
+                OnNext(220, 2),
+                OnNext(230, 3),
+                OnNext(240, 4),
+                OnCompleted<int>(250)
+            );
+
+            var subject = new MySubject();
+
+            var conn = new ConnectableObservable<int>(xs, subject);
+
+            var res = scheduler.Start(() =>
+                conn.RefCount(TimeSpan.FromSeconds(2))
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(210, 1),
+                OnNext(220, 2),
+                OnNext(230, 3),
+                OnNext(240, 4),
+                OnCompleted<int>(250)
+            );
+
+            Assert.True(subject.Disposed);
+        }
+
+        [Fact]
+        public void LazyRefCount_NotConnected()
+        {
+            var scheduler = new TestScheduler();
+            var disconnected = false;
+            var count = 0;
+            var xs = Observable.Defer(() =>
+            {
+                count++;
+                return Observable.Create<int>(obs =>
+                {
+                    return () => { disconnected = true; };
+                });
+            });
+
+            var subject = new MySubject();
+
+            var conn = new ConnectableObservable<int>(xs, subject);
+            var refd = conn.RefCount(TimeSpan.FromTicks(20), scheduler);
+
+            var dis1 = refd.Subscribe();
+            Assert.Equal(1, count);
+            Assert.Equal(1, subject.SubscribeCount);
+            Assert.False(disconnected);
+
+            var dis2 = refd.Subscribe();
+            Assert.Equal(1, count);
+            Assert.Equal(2, subject.SubscribeCount);
+            Assert.False(disconnected);
+
+            dis1.Dispose();
+            Assert.False(disconnected);
+            dis2.Dispose();
+            Assert.False(disconnected);
+
+            scheduler.AdvanceBy(19);
+            Assert.False(disconnected);
+
+            scheduler.AdvanceBy(1);
+            Assert.True(disconnected);
+            disconnected = false;
+
+            var dis3 = refd.Subscribe();
+            Assert.Equal(2, count);
+            Assert.Equal(3, subject.SubscribeCount);
+            Assert.False(disconnected);
+
+            dis3.Dispose();
+            scheduler.AdvanceBy(20);
+            Assert.True(disconnected);
+        }
+
+        [Fact]
+        public void LazyRefCount_OnError()
+        {
+            var ex = new Exception();
+            var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
+
+            var res = xs.Publish().RefCount(TimeSpan.FromSeconds(2));
+
+            res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
+            res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
+        }
+
+        [Fact]
+        public void LazyRefCount_Publish()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable<int>(
+                OnNext(210, 1),
+                OnNext(220, 2),
+                OnNext(230, 3),
+                OnNext(240, 4),
+                OnNext(250, 5),
+                OnNext(260, 6),
+                OnNext(270, 7),
+                OnNext(280, 8),
+                OnNext(290, 9),
+                OnCompleted<int>(300)
+            );
+
+            var res = xs.Publish().RefCount(TimeSpan.FromTicks(9), scheduler);
+
+            var d1 = default(IDisposable);
+            var o1 = scheduler.CreateObserver<int>();
+            scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
+            scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
+
+            var d2 = default(IDisposable);
+            var o2 = scheduler.CreateObserver<int>();
+            scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
+            scheduler.ScheduleAbsolute(275, () =>
+            {
+                d2.Dispose();
+            });
+
+            var d3 = default(IDisposable);
+            var o3 = scheduler.CreateObserver<int>();
+            scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
+            scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
+
+            var d4 = default(IDisposable);
+            var o4 = scheduler.CreateObserver<int>();
+            scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
+            scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
+
+            scheduler.Start();
+
+            o1.Messages.AssertEqual(
+                OnNext(220, 2),
+                OnNext(230, 3)
+            );
+
+            o2.Messages.AssertEqual(
+                OnNext(230, 3),
+                OnNext(240, 4),
+                OnNext(250, 5),
+                OnNext(260, 6),
+                OnNext(270, 7)
+            );
+
+            o3.Messages.AssertEqual(
+                OnNext(260, 6)
+            );
+
+            o4.Messages.AssertEqual(
+                OnNext(290, 9),
+                OnCompleted<int>(300)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(215, 284),
+                Subscribe(285, 300)
+            );
+        }
     }
 }