Browse Source

4.x: Add the IConnectableObservable.AutoConnect() operator (#497)

David Karnok 7 years ago
parent
commit
3964f9f8c0

+ 1 - 0
Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

@@ -323,6 +323,7 @@ namespace System.Reactive.Linq
         IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize, TimeSpan window);
         IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, int bufferSize, TimeSpan window, IScheduler scheduler);
         IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize, TimeSpan window, IScheduler scheduler);
+        IObservable<TSource> AutoConnect<TSource>(IConnectableObservable<TSource> source, int minObservers, Action<IDisposable> onConnect);
 
         #endregion
 

+ 22 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs

@@ -205,6 +205,28 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + AutoConnect +
+
+        /// <summary>
+        /// Automatically connect the upstream IConnectableObservable at most once when the
+        /// specified number of IObservers have subscribed to this IObservable.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="minObservers">The number of observers required to subscribe before the connection to source happens, non-positive value will trigger an immediate subscription.</param>
+        /// <param name="onConnect">If not null, the connection's IDisposable is provided to it.</param>
+        /// <returns>An observable sequence that connects to the source at most once when the given number of observers have subscribed to it.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        public static IObservable<TSource> AutoConnect<TSource>(this IConnectableObservable<TSource> source, int minObservers = 1, Action<IDisposable> onConnect = null)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return s_impl.AutoConnect(source, minObservers, onConnect);
+        }
+
+        #endregion
+
         #region + Replay +
 
         /// <summary>

+ 47 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/AutoConnect.cs

@@ -0,0 +1,47 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Reactive.Subjects;
+using System.Threading;
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    /// <summary>
+    /// Automatically connect the upstream IConnectableObservable once the
+    /// specified number of IObservers have subscribed to this IObservable.
+    /// </summary>
+    /// <typeparam name="T">The upstream value type.</typeparam>
+    internal sealed class AutoConnect<T> : IObservable<T>
+    {
+        readonly IConnectableObservable<T> source;
+
+        readonly int minObservers;
+
+        readonly Action<IDisposable> onConnect;
+
+        int count;
+
+        internal AutoConnect(IConnectableObservable<T> source, int minObservers, Action<IDisposable> onConnect)
+        {
+            this.source = source;
+            this.minObservers = minObservers;
+            this.onConnect = onConnect;
+        }
+
+        public IDisposable Subscribe(IObserver<T> observer)
+        {
+            var d = source.Subscribe(observer);
+
+            if (Volatile.Read(ref count) < minObservers)
+            {
+                if (Interlocked.Increment(ref count) == minObservers)
+                {
+                    var c = source.Connect();
+                    onConnect?.Invoke(c);
+                }
+            }
+            return d;
+        }
+    }
+}

+ 34 - 1
Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

@@ -10627,7 +10627,40 @@ namespace System.Reactive.Linq
                 )
             );
         }
-        
+
+        /// <summary>
+        /// Automatically connect the upstream IConnectableObservable at most once when the
+        /// specified number of IObservers have subscribed to this IObservable.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Connectable observable sequence.</param>
+        /// <param name="minObservers">The number of observers required to subscribe before the connection to source happens, non-positive value will trigger an immediate subscription.</param>
+        /// <param name="onConnect">If not null, the connection's IDisposable is provided to it.</param>
+        /// <returns>An observable sequence that connects to the source at most once when the given number of observers have subscribed to it.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        public static IQbservable<TSource> AutoConnect<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, int minObservers, Action<IDisposable> onConnect)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.AutoConnect<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(int), default(Action<IDisposable>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
+                    Expression.Constant(minObservers, typeof(int)),
+                    Expression.Constant(onConnect, typeof(Action<IDisposable>))
+                )
+            );
+        }
+
         /// <summary>
         /// Generates an observable sequence that repeats the given element infinitely.
         /// </summary>

+ 17 - 0
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs

@@ -71,6 +71,23 @@ namespace System.Reactive.Linq
             return new RefCount<TSource>(source);
         }
 
+        #endregion
+
+        #region + AutoConnect +
+
+        public virtual IObservable<TSource> AutoConnect<TSource>(IConnectableObservable<TSource> source, int minObservers = 1, Action<IDisposable> onConnect = null)
+        {
+            if (minObservers <= 0)
+            {
+                var d = source.Connect();
+                onConnect?.Invoke(d);
+                return source;
+            }
+
+            return new AutoConnect<TSource>(source, minObservers, onConnect);
+        }
+
+
         #endregion
 
         #region + Replay +

+ 140 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/ObservableAutoConnectTest.cs

@@ -0,0 +1,140 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using Microsoft.Reactive.Testing;
+using Xunit;
+using ReactiveTests.Dummies;
+using System.Reflection;
+using System.Reactive.Subjects;
+
+namespace ReactiveTests.Tests
+{
+    public class ObservableAutoConnectTest : ReactiveTest
+    {
+        [Fact]
+        public void AutoConnect_Basic()
+        {
+            int called = 0;
+
+            var source = Observable.Defer(() =>
+            {
+                called++;
+                return Observable.Range(1, 5);
+            })
+            .Replay()
+            .AutoConnect();
+
+            Assert.Equal(0, called);
+
+            var list = source.ToList().First();
+
+            Assert.Equal(1, called);
+            Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
+
+            list = source.ToList().First();
+
+            Assert.Equal(1, called);
+            Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
+        }
+
+        [Fact]
+        public void AutoConnect_Immediately()
+        {
+            int called = 0;
+
+            var source = Observable.Defer(() =>
+            {
+                called++;
+                return Observable.Range(1, 5);
+            })
+            .Replay()
+            .AutoConnect(0);
+
+            Assert.Equal(1, called);
+
+            var list = source.ToList().First();
+
+            Assert.Equal(1, called);
+            Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
+
+            list = source.ToList().First();
+
+            Assert.Equal(1, called);
+            Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
+        }
+
+        [Fact]
+        public void AutoConnect_TwoConsumers()
+        {
+            int called = 0;
+
+            var source = Observable.Defer(() =>
+            {
+                called++;
+                return Observable.Range(1, 5);
+            })
+            .Replay()
+            .AutoConnect(2);
+
+            Assert.Equal(0, called);
+
+            var list0 = new List<int>();
+
+            source.Subscribe(v => list0.Add(v));
+
+            Assert.Equal(0, called);
+            Assert.Equal(0, list0.Count);
+
+            var list = source.ToList().First();
+
+            Assert.Equal(1, called);
+            Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
+
+            Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list0);
+
+            list = source.ToList().First();
+
+            Assert.Equal(1, called);
+            Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
+        }
+
+        [Fact]
+        public void AutoConnect_Dispose()
+        {
+            var subject = new Subject<int>();
+
+            var disposable = new IDisposable[1];
+
+            var source = subject
+            .Replay()
+            .AutoConnect(1, d => disposable[0] = d);
+
+            Assert.Null(disposable[0]);
+
+            var list = new List<int>();
+
+            source.Subscribe(v => list.Add(v));
+
+            Assert.NotNull(disposable[0]);
+
+            subject.OnNext(1);
+            subject.OnNext(2);
+            subject.OnNext(3);
+
+            disposable[0].Dispose();
+
+            subject.OnNext(4);
+            subject.OnNext(5);
+
+            Assert.Equal(new List<int>() { 1, 2, 3 }, list);
+
+        }
+    }
+}