Browse Source

Add RefCount overloads with minimum observer count requirement

Dávid Karnok 6 years ago
parent
commit
a18844d316

+ 3 - 0
Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

@@ -309,6 +309,9 @@ namespace System.Reactive.Linq
         IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source);
         IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay);
         IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler schedulder);
+        IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers);
+        IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay);
+        IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay, IScheduler schedulder);
         IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source);
         IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, IScheduler scheduler);
         IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);

+ 88 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs

@@ -291,6 +291,94 @@ namespace System.Reactive.Linq
             return s_impl.RefCount(source, disconnectDelay, scheduler);
         }
 
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="minObservers">The minimum number of observers subscribing to establish the connection to the source.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
+        public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, int minObservers)
+        {
+            if (source == null)
+            {
+                throw new ArgumentNullException(nameof(source));
+            }
+            if (minObservers <= 0)
+            {
+                throw new ArgumentOutOfRangeException(nameof(minObservers));
+            }
+
+            return s_impl.RefCount(source, minObservers);
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="minObservers">The minimum number of observers subscribing to establish the connection to the source.</param>
+        /// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
+        public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay)
+        {
+            if (source == null)
+            {
+                throw new ArgumentNullException("source");
+            }
+
+            if (disconnectDelay < TimeSpan.Zero)
+            {
+                throw new ArgumentException("disconnectDelay");
+            }
+            if (minObservers <= 0)
+            {
+                throw new ArgumentOutOfRangeException(nameof(minObservers));
+            }
+
+            return s_impl.RefCount(source, minObservers, disconnectDelay);
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="minObservers">The minimum number of observers subscribing to establish the connection to the source.</param>
+        /// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
+        /// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
+        public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay, IScheduler scheduler)
+        {
+            if (source == null)
+            {
+                throw new ArgumentNullException("source");
+            }
+
+            if (scheduler == null)
+            {
+                throw new ArgumentNullException("scheduler");
+            }
+
+            if (disconnectDelay < TimeSpan.Zero)
+            {
+                throw new ArgumentException("disconnectDelay");
+            }
+            if (minObservers <= 0)
+            {
+                throw new ArgumentOutOfRangeException(nameof(minObservers));
+            }
+
+
+            return s_impl.RefCount(source, minObservers, disconnectDelay, scheduler);
+        }
+
         #endregion
 
         #region + AutoConnect +

+ 10 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -23,10 +23,13 @@ namespace System.Reactive.Linq.ObservableImpl
             /// </summary>
             private RefConnection _connection;
 
-            public Eager(IConnectableObservable<TSource> source)
+            private readonly int _minObservers;
+
+            public Eager(IConnectableObservable<TSource> source, int minObservers)
             {
                 _source = source;
                 _gate = new object();
+                _minObservers = minObservers;
             }
 
             protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
@@ -68,7 +71,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
 
                         // this is the first observer, then connect
-                        doConnect = conn._count++ == 0;
+                        doConnect = ++conn._count == _parent._minObservers;
                         // save the current connection for this observer
                         _targetConnection = conn;
                     }
@@ -132,16 +135,19 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly IScheduler _scheduler;
             private readonly TimeSpan _disconnectTime;
             private readonly IConnectableObservable<TSource> _source;
+            private readonly int _minObservers;
+
             private IDisposable _serial;
             private int _count;
             private IDisposable _connectableSubscription;
 
-            public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
+            public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers)
             {
                 _source = source;
                 _gate = new object();
                 _disconnectTime = disconnectTime;
                 _scheduler = scheduler;
+                _minObservers = minObservers;
             }
 
             protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
@@ -161,7 +167,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     lock (parent._gate)
                     {
-                        if (++parent._count == 1)
+                        if (++parent._count == parent._minObservers)
                         {
                             if (parent._connectableSubscription == null)
                             {

+ 113 - 0
Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

@@ -10854,6 +10854,119 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="minObservers">The minimum number of observers subscribing to establish the connection to the source.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
+        public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, int minObservers)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (minObservers <= 0)
+                throw new ArgumentOutOfRangeException(nameof(minObservers));
+
+            return provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(int))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
+                    Expression.Constant(minObservers, typeof(int))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="minObservers">The minimum number of observers subscribing to establish the connection to the source.</param>
+        /// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
+        public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (minObservers <= 0)
+                throw new ArgumentOutOfRangeException(nameof(minObservers));
+
+            return provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(int), default(TimeSpan))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
+                    Expression.Constant(minObservers, typeof(int)),
+                    Expression.Constant(disconnectDelay, typeof(TimeSpan))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="minObservers">The minimum number of observers subscribing to establish the connection to the source.</param>
+        /// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
+        /// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
+        /// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
+        public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay, IScheduler scheduler)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+            if (minObservers <= 0)
+                throw new ArgumentOutOfRangeException(nameof(minObservers));
+
+            return provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(int), default(TimeSpan), default(IScheduler))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
+                    Expression.Constant(minObservers, typeof(int)),
+                    Expression.Constant(disconnectDelay, typeof(TimeSpan)),
+                    Expression.Constant(scheduler, typeof(IScheduler))
+                )
+            );
+        }
+
         /// <summary>
         /// Generates an observable sequence that repeats the given element infinitely.
         /// </summary>

+ 17 - 2
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs

@@ -67,7 +67,7 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source)
         {
-            return new RefCount<TSource>.Eager(source);
+            return RefCount(source, 1);
         }
 
         public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime)
@@ -77,7 +77,22 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
         {
-            return new RefCount<TSource>.Lazy(source, disconnectTime, scheduler);
+            return RefCount(source, 1, disconnectTime, scheduler);
+        }
+
+        public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers)
+        {
+            return new RefCount<TSource>.Eager(source, minObservers);
+        }
+
+        public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectTime)
+        {
+            return RefCount(source, minObservers, disconnectTime, Scheduler.Default);
+        }
+
+        public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectTime, IScheduler scheduler)
+        {
+            return new RefCount<TSource>.Lazy(source, disconnectTime, scheduler, minObservers);
         }
 
         #endregion

+ 6 - 0
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.cs

@@ -1271,8 +1271,11 @@ namespace System.Reactive.Linq
         public static System.IObservable<int> Range(int start, int count) { }
         public static System.IObservable<int> Range(int start, int count, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TSource> RefCount<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source) { }
+        public static System.IObservable<TSource> RefCount<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers) { }
         public static System.IObservable<TSource> RefCount<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source, System.TimeSpan disconnectDelay) { }
+        public static System.IObservable<TSource> RefCount<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers, System.TimeSpan disconnectDelay) { }
         public static System.IObservable<TSource> RefCount<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source, System.TimeSpan disconnectDelay, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<TSource> RefCount<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers, System.TimeSpan disconnectDelay, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TResult> Repeat<TResult>(TResult value) { }
         public static System.IObservable<TSource> Repeat<TSource>(this System.IObservable<TSource> source) { }
         public static System.IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount) { }
@@ -1995,8 +1998,11 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<int> Range(this System.Reactive.Linq.IQbservableProvider provider, int start, int count) { }
         public static System.Reactive.Linq.IQbservable<int> Range(this System.Reactive.Linq.IQbservableProvider provider, int start, int count, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TSource> RefCount<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source) { }
+        public static System.Reactive.Linq.IQbservable<TSource> RefCount<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers) { }
         public static System.Reactive.Linq.IQbservable<TSource> RefCount<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source, System.TimeSpan disconnectDelay) { }
+        public static System.Reactive.Linq.IQbservable<TSource> RefCount<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers, System.TimeSpan disconnectDelay) { }
         public static System.Reactive.Linq.IQbservable<TSource> RefCount<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source, System.TimeSpan disconnectDelay, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<TSource> RefCount<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers, System.TimeSpan disconnectDelay, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TSource> Repeat<TSource>(this System.Reactive.Linq.IQbservable<TSource> source) { }
         public static System.Reactive.Linq.IQbservable<TResult> Repeat<TResult>(this System.Reactive.Linq.IQbservableProvider provider, TResult value) { }
         public static System.Reactive.Linq.IQbservable<TSource> Repeat<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, int repeatCount) { }

+ 101 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System;
+using System.Collections.Generic;
 using System.Reactive;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
@@ -436,5 +437,105 @@ namespace ReactiveTests.Tests
             Assert.Equal(1, subscribed);
             Assert.Equal(1, unsubscribed);
         }
+
+        [Fact]
+        public void RefCount_minObservers_not_connected_Eager()
+        {
+            int connected = 0;
+            var source = Observable.Defer(() =>
+            {
+                connected++;
+                return Observable.Never<int>();
+            })
+            .Publish()
+            .RefCount(2);
+
+            Assert.Equal(0, connected);
+
+            source.Subscribe();
+
+            Assert.Equal(0, connected);
+        }
+
+        [Fact]
+        public void RefCount_minObservers_connected_Eager()
+        {
+            var connected = 0;
+            var source = Observable.Defer(() =>
+            {
+                connected++;
+                return Observable.Range(1, 5);
+            })
+            .Publish()
+            .RefCount(2);
+
+            Assert.Equal(0, connected);
+
+            var list1 = new List<int>();
+            source.Subscribe(list1.Add);
+
+            Assert.Equal(0, connected);
+            Assert.Empty(list1);
+
+            var list2 = new List<int>();
+            source.Subscribe(list2.Add);
+
+            Assert.Equal(1, connected);
+
+            var expected = new List<int>(new[] { 1, 2, 3, 4, 5 });
+
+            Assert.Equal(expected, list1);
+            Assert.Equal(expected, list2);
+        }
+
+        [Fact]
+        public void RefCount_minObservers_not_connected_Lazy()
+        {
+            int connected = 0;
+            var source = Observable.Defer(() =>
+            {
+                connected++;
+                return Observable.Never<int>();
+            })
+            .Publish()
+            .RefCount(2, TimeSpan.FromMinutes(1));
+
+            Assert.Equal(0, connected);
+
+            source.Subscribe();
+
+            Assert.Equal(0, connected);
+        }
+
+        [Fact]
+        public void RefCount_minObservers_connected_Lazy()
+        {
+            var connected = 0;
+            var source = Observable.Defer(() =>
+            {
+                connected++;
+                return Observable.Range(1, 5);
+            })
+            .Publish()
+            .RefCount(2, TimeSpan.FromMinutes(1));
+
+            Assert.Equal(0, connected);
+
+            var list1 = new List<int>();
+            source.Subscribe(list1.Add);
+
+            Assert.Equal(0, connected);
+            Assert.Empty(list1);
+
+            var list2 = new List<int>();
+            source.Subscribe(list2.Add);
+
+            Assert.Equal(1, connected);
+
+            var expected = new List<int>(new[] { 1, 2, 3, 4, 5 });
+
+            Assert.Equal(expected, list1);
+            Assert.Equal(expected, list2);
+        }
     }
 }