// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using Microsoft.Reactive.Testing; using Microsoft.VisualStudio.TestTools.UnitTesting; using Assert = Xunit.Assert; namespace ReactiveTests.Tests { [TestClass] public class RefCountTest : ReactiveTest { /// /// A connectable observable that provides an individual notification upon connection, where /// the notification can be different from one connection to the next. /// /// Element type. /// /// /// The most important capability this provides is to be able to provide values after /// having completed. Obviously it won't do that for any single subscription because that /// would break the basic Rx contract, but this can deliver completion to some subscribers, /// and then go on to deliver values to subsequent subscribers. (The connectable /// observables returned by Publish can't do this: once their subject has delivered /// a completion notification it can't deliver anything else, not even to new subscribers. /// That's why we need a specialized type.) /// /// private sealed class SerialSingleNotificationConnectable : IConnectableObservable { private readonly object _gate = new(); private Notification _notificationAtNextConnect; private Subject _sourceForNextConnect = new(); private Connection _nextConnectionInProgress; public SerialSingleNotificationConnectable(Notification initialNotificationAtNextConnect) { _notificationAtNextConnect = initialNotificationAtNextConnect; _nextConnectionInProgress = new(_sourceForNextConnect); } public List Connections { get; } = new(); private Connection ActiveConnection => (Connections.Count > 0 && Connections[Connections.Count - 1] is Connection { Disposed: false } activeConnection) ? activeConnection : null; private Connection CurrentConnection => ActiveConnection ?? _nextConnectionInProgress; public void SetNotificationForNextConnect(Notification notification) { _notificationAtNextConnect = notification; } public void DeliverNotificationForActiveConnection(Notification notification) { if (ActiveConnection is not Connection activeConnection) { throw new InvalidOperationException("No connection is currently active"); } if (activeConnection.Source is not Subject source) { throw new InvalidOperationException("Active connection's source has been replaced and is no longer a Subject, so it is not possible to deliver further notifications to current subscribers"); } notification.Accept(source); } public IDisposable Connect() { Connection connecting; Notification notification; Subject source; lock (_gate) { connecting = _nextConnectionInProgress; notification = _notificationAtNextConnect; source = _sourceForNextConnect; _sourceForNextConnect = new Subject(); _nextConnectionInProgress = new(_sourceForNextConnect); Connections.Add(connecting); } notification.Accept(source); return connecting; } public IDisposable Subscribe(IObserver observer) { Connection connection; lock (_gate) { connection = CurrentConnection; } return connection.Source.Subscribe(observer); } public sealed class Connection(IObservable source) : IDisposable { /// /// Gets a value indicating whether this connection has been disposed. /// public bool Disposed { get; private set; } public IObservable Source { get; private set; } = source; /// /// In scenarios where has entered a completed state, this /// replaces it with a new source so if further subscribers to the same connection /// come along, tests can deliver notifications to those. /// /// /// Without this method, will /// deliver events only when is called, meaning that only /// observers that subscribed before that call will receive any notifications /// (unless the notification was OnComplete, in which case the subject /// enters a completed state, and completes all further subscribers). This enables /// tests to create scenarios where subscriptions made after Connect (and /// before that connection is disposed) can receive further notifications. /// public void ReplaceSource(IObservable source) { Source = source; } public void Dispose() { Disposed = true; } } } /// /// A connectable observable that logs calls to but otherwise ignores /// them, forwarding calls to the current underlying source (which /// can be changed over time). /// /// Element type. /// /// /// This is similar to , in that the /// underlying source can be changed over time, making it possible for this to complete /// observers, but then revert to a state where subsequent observers will not be completed. /// But this also enables simulation of unusual (but not strictly disallowed) behaviour, /// in which subscribers will receive notifications before calling . /// It's useful to be able to do this because it can happen in more normal setups when /// sources completed synchronously, and it's easy to handle this incorrectly. /// /// private sealed class SerialConnectableIgnoringConnect : IConnectableObservable { private IObservable _source; public SerialConnectableIgnoringConnect(IObservable initialSource) { _source = initialSource; } public void SetSource(IObservable source) { _source = source; } public List Connections { get; } = new(); public IDisposable Connect() { var connection = new Connection(); Connections.Add(connection); return connection; } public IDisposable Subscribe(IObserver observer) { return _source.Subscribe(observer); } public sealed class Connection() : IDisposable { /// /// Gets a value indicating whether this connection has been disposed. /// public bool Disposed { get; private set; } public void Dispose() { Disposed = true; } } } #region Immediate Disconnect [TestMethod] public void RefCount_NoDelay_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.RefCount(null)); ReactiveAssert.Throws(() => Observable.RefCount(null, 2)); ReactiveAssert.Throws(() => Observable.RefCount(Observable.Never().Publish(), 0)); ReactiveAssert.Throws(() => Observable.RefCount(Observable.Never().Publish(), -1)); ReactiveAssert.Throws(() => Observable.RefCount(Observable.Never().Publish(), -2)); } [TestMethod] public void RefCount_NoDelay_ConnectsOnFirst() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted(250) ); var subject = new MySubject(); var conn = new ConnectableObservable(xs, subject); var res = scheduler.Start(() => conn.RefCount() ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted(250) ); Assert.True(subject.Disposed); } [TestMethod] public void RefCount_NoDelay_minObservers_ConnectsOnObserverThresholdReached() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted(250) ); var subject = new MySubject(); var conn = new ConnectableObservable(xs, subject); var res = conn.RefCount(2); var d1 = default(IDisposable); var o1 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(205, () => { d1 = res.Subscribe(o1); }); var d2 = default(IDisposable); var o2 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); }); scheduler.Start(); o1.Messages.AssertEqual( OnNext(230, 3), OnNext(240, 4), OnCompleted(250) ); Assert.True(subject.Disposed); } [TestMethod] public void RefCount_NoDelay_SourceProducesValuesAndCompletesInConnect() { var connected = 0; var source = Observable.Defer(() => { connected++; return Observable.Range(1, 5); }) .Publish() .RefCount(); Assert.Equal(0, connected); var list1 = new List(); source.Subscribe(list1.Add); Assert.Equal(1, connected); List expected1 = [1, 2, 3, 4, 5]; Assert.Equal(expected1, list1); var list2 = new List(); source.Subscribe(list2.Add); Assert.Equal(1, connected); Assert.Empty(list2); } [TestMethod] public void RefCount_NoDelay_minObservers_SourceProducesValuesAndCompletesInConnect() { var connected = 0; var source = Observable.Defer(() => { connected++; return Observable.Range(1, 5); }) .Publish() .RefCount(2); Assert.Equal(0, connected); var list1 = new List(); source.Subscribe(list1.Add); Assert.Equal(0, connected); Assert.Empty(list1); var list2 = new List(); source.Subscribe(list2.Add); Assert.Equal(1, connected); List expected = [1, 2, 3, 4, 5]; Assert.Equal(expected, list1); Assert.Equal(expected, list2); } [TestMethod] public void RefCount_NoDelay_SourceCompletesWithNoValuesInConnect() { var connectable = new SerialSingleNotificationConnectable(Notification.CreateOnCompleted()); var refCount = connectable.RefCount(); var s1 = refCount.Subscribe(); Assert.Equal(1, connectable.Connections.Count); // Since the source immediately completed, the RefCount goes back to zero subscribers // inside the call to Connect, so we expect to be disconnected. Assert.True(connectable.Connections[0].Disposed); var s2 = refCount.Subscribe(); Assert.Equal(2, connectable.Connections.Count); Assert.True(connectable.Connections[1].Disposed); } [TestMethod] public void RefCount_NoDelay_minObservers_SourceCompletesWithNoValuesInConnect() { var connectable = new SerialSingleNotificationConnectable(Notification.CreateOnCompleted()); var refCount = connectable.RefCount(2); var s1 = refCount.Subscribe(); Assert.Equal(0, connectable.Connections.Count); var s2 = refCount.Subscribe(); Assert.Equal(1, connectable.Connections.Count); // Since the source completes immediately, we will have no active subscribers, so // we expect to be disconnected. Assert.True(connectable.Connections[0].Disposed); s1.Dispose(); s2.Dispose(); // Disposing subscriptions should change nothing because they self-completed. Assert.Equal(1, connectable.Connections.Count); // We're now back in the initial disconnected state, so nothing more should // happen until we get up to minObservers. var s3 = refCount.Subscribe(); Assert.Equal(1, connectable.Connections.Count); var s4 = refCount.Subscribe(); Assert.Equal(2, connectable.Connections.Count); Assert.True(connectable.Connections[1].Disposed); } [TestMethod] public void RefCount_NoDelay_NotConnected() { var disconnected = false; var count = 0; var xs = Observable.Defer(() => { count++; return Observable.Create(obs => { return () => { disconnected = true; }; }); }); var subject = new MySubject(); var conn = new ConnectableObservable(xs, subject); var refd = conn.RefCount(); var dis1 = refd.Subscribe(); Assert.Equal(1, count); Assert.Equal(1, subject.SubscribeCount); Assert.False(disconnected); var dis2 = refd.Subscribe(); Assert.Equal(1, count); Assert.Equal(2, subject.SubscribeCount); Assert.False(disconnected); dis1.Dispose(); Assert.False(disconnected); dis2.Dispose(); Assert.True(disconnected); disconnected = false; var dis3 = refd.Subscribe(); Assert.Equal(2, count); Assert.Equal(3, subject.SubscribeCount); Assert.False(disconnected); dis3.Dispose(); Assert.True(disconnected); } [TestMethod] public void RefCount_NoDelay_minObservers_NotConnected() { var connected = 0; var source = Observable.Defer(() => { connected++; return Observable.Never(); }) .Publish() .RefCount(2); Assert.Equal(0, connected); source.Subscribe(); Assert.Equal(0, connected); } [TestMethod] public void RefCount_NoDelay_OnError() { var ex = new Exception(); var xs = Observable.Throw(ex, Scheduler.Immediate); var res = xs.Publish().RefCount(); res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); }); res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); }); } [TestMethod] public void RefCount_NoDelay_minObservers_OnError() { var ex = new Exception(); var xs = Observable.Throw(ex, Scheduler.Immediate); var res = xs.Publish().RefCount(2); var exceptionsReceived = new List(); void AddSubscriber() { res.Subscribe( _ => { Assert.Fail("OnNext unexpected"); }, ex_ => { exceptionsReceived.Add(ex); }, () => { Assert.Fail("OnComplete unexpected"); }); } AddSubscriber(); Assert.Equal(0, exceptionsReceived.Count); AddSubscriber(); Assert.Equal(2, exceptionsReceived.Count); Assert.Same(ex, exceptionsReceived[0]); Assert.Same(ex, exceptionsReceived[1]); } [TestMethod] public void RefCount_NoDelay_HotSourceMultipleSubscribers() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7), OnNext(280, 8), OnNext(290, 9), OnCompleted(300) ); var res = xs.Publish().RefCount(); var d1 = default(IDisposable); var o1 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); }); scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); }); var d2 = default(IDisposable); var o2 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); }); scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); }); var d3 = default(IDisposable); var o3 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); }); scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); }); var d4 = default(IDisposable); var o4 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); }); scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); }); scheduler.Start(); o1.Messages.AssertEqual( OnNext(220, 2), OnNext(230, 3) ); o2.Messages.AssertEqual( OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7) ); o3.Messages.AssertEqual( OnNext(260, 6) ); o4.Messages.AssertEqual( OnNext(290, 9), OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(215, 275), Subscribe(285, 300) ); } [TestMethod] public void RefCount_NoDelay_minObservers_HotSourceMultipleSubscribers() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), // 0 subscribers OnNext(220, 2), // 1 subscriber OnNext(230, 3), // 2 subscribers OnNext(240, 4), // 1 subscriber OnNext(250, 5), // 1 subscriber OnNext(260, 6), // 2 subscribers OnNext(270, 7), // 1 subscribers OnNext(280, 8), // 0 subscribers OnNext(290, 9), // 1 subscribers OnNext(300, 10), // 2 subscribers OnCompleted(310) ); var res = xs.Publish().RefCount(2); var d1 = default(IDisposable); var o1 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); }); scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); }); var d2 = default(IDisposable); var o2 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); }); scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); }); var d3 = default(IDisposable); var o3 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); }); scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); }); var d4 = default(IDisposable); var o4 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); }); scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); }); var d5 = default(IDisposable); var o5 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(295, () => { d5 = res.Subscribe(o5); }); scheduler.ScheduleAbsolute(320, () => { d5.Dispose(); }); scheduler.Start(); o1.Messages.AssertEqual( OnNext(230, 3) ); o2.Messages.AssertEqual( OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7) ); o3.Messages.AssertEqual( OnNext(260, 6) ); o4.Messages.AssertEqual( OnNext(300, 10), OnCompleted(310) ); o5.Messages.AssertEqual( OnNext(300, 10), OnCompleted(310) ); xs.Subscriptions.AssertEqual( Subscribe(225, 275), Subscribe(295, 310) ); } [TestMethod] public void RefCount_NoDelay_minObservers_SubscriptionsDropBelowThresholdButNotToZero() { var subject = new ReplaySubject(5); var connected = 0; var source = Observable.Defer(() => { connected++; return subject; }) .Publish().RefCount(2); subject.OnNext(1); Assert.Equal(0, connected); var list1 = new List(); var sub1 = source.Subscribe(list1.Add); Assert.Equal(0, connected); Assert.Empty(list1); subject.OnNext(2); var list2 = new List(); var sub2 = source.Subscribe(list2.Add); // Since connection only occurred with the 2nd subscriber, we expect both to get everything // the ReplaySubject has stored. List expectedSub1 = [1, 2]; var expectedSub2 = expectedSub1; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub1, list2); Assert.Equal(1, connected); subject.OnNext(3); // Both subscribers should have received the new item. expectedSub1 = expectedSub2 = [1, 2, 3]; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(1, connected); var list3 = new List(); source.Subscribe(list3.Add); // Since we were already connected, the 3rd subscriber just gets added to the observers of // the Publish multicast output, and no new connection should occur to the underlying ReplaySubject. // So for this 3rd subscription, no new items should be received by any of the subscribers List expectedSub3 = []; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); Assert.Equal(1, connected); subject.OnNext(4); // All the current subscribers should have received that latest item. expectedSub1 = expectedSub2 = [1, 2, 3, 4]; expectedSub3 = [4]; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); Assert.Equal(1, connected); sub1.Dispose(); subject.OnNext(5); // The two remaining subscribers should have received that new item, but the one that just // unsubscribed should not. expectedSub1 = [1, 2, 3, 4]; expectedSub2 = [1, 2, 3, 4, 5]; expectedSub3 = [4, 5]; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); Assert.Equal(1, connected); sub2.Dispose(); subject.OnNext(6); // We are now below the minObservers threshold of 2, but that threshold only governs when we move // from a disconnected state to a connected state. We should remain connected as long as there is // at least one subscriber, so we expect the remaining subscriber to receive that last item. expectedSub1 = [1, 2, 3, 4]; expectedSub2 = [1, 2, 3, 4, 5]; expectedSub3 = [4, 5, 6]; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); Assert.Equal(1, connected); } [TestMethod] public void RefCount_NoDelay_SubscriptionsDropBelowThresholdAndThenBackAbove() { var sourceAfterInitial = new Subject(); var connected = 0; var source = Observable.Defer(() => { connected++; return Observable.Range(1, 5).Concat(sourceAfterInitial); }) .Publish() .RefCount(2); Assert.Equal(0, connected); var list1 = new List(); var sub1 = source.Subscribe(list1.Add); Assert.Equal(0, connected); Assert.Empty(list1); var list2 = new List(); var sub2 = source.Subscribe(list2.Add); Assert.Equal(1, connected); sourceAfterInitial.OnNext(6); sub1.Dispose(); sourceAfterInitial.OnNext(7); Assert.Equal(1, connected); var list3 = new List(); var sub3 = source.Subscribe(list3.Add); // This is the distinguishing feature of this test. With that last subscription, we went from 1 // subscriber (below minObservers) but still connected (because we already hit minObservers once // and never dropped to zero), and now we're passing through minObservers again. We used to have // a bug where we would erroneously attempt to reconnect at this point. Assert.Equal(1, connected); sourceAfterInitial.OnNext(8); var expectedSub1 = new List([1, 2, 3, 4, 5, 6]); var expectedSub2 = new List([1, 2, 3, 4, 5, 6, 7, 8]); var expectedSub3 = new List([8]); Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); } [TestMethod] public void RefCount_NoDelay_ValuesDuringAndAfterSubscribe() { var subject = new ReplaySubject(5); var source = subject.Publish().RefCount(); subject.OnNext(1); // Although the source is a ReplaySubject, the use of Publish means there will only be // a single subscription to the ReplaySubject, so it will only replay one. (It will replay // that first value on the initial connect.) So we expect each subscriber to see fewer and // fewer values. // all subscribers will see all the values List expected1 = [1]; var list1 = new List(); source.Subscribe(list1.Add); Assert.Equal(expected1, list1); subject.OnNext(2); var list2 = new List(); source.Subscribe(list2.Add); expected1 = [1, 2]; List expected2 = []; Assert.Equal(expected1, list1); Assert.Equal(expected2, list2); subject.OnNext(3); var list3 = new List(); source.Subscribe(list3.Add); expected1 = [1, 2, 3]; expected2 = [3]; List expected3 = []; Assert.Equal(expected1, list1); Assert.Equal(expected2, list2); Assert.Equal(expected3, list3); subject.OnNext(4); expected1 = [1, 2, 3, 4]; expected2 = [3, 4]; expected3 = [4]; Assert.Equal(expected1, list1); Assert.Equal(expected2, list2); Assert.Equal(expected3, list3); } [TestMethod] public void RefCount_NoDelay_minObservers_ValuesDuringAndAfterSubscribe() { var subject = new ReplaySubject(5); var source = subject.Publish().RefCount(2); subject.OnNext(1); var list1 = new List(); source.Subscribe(list1.Add); Assert.Empty(list1); subject.OnNext(2); List expected1and2 = [1, 2]; var list2 = new List(); source.Subscribe(list2.Add); Assert.Equal(expected1and2, list1); Assert.Equal(expected1and2, list2); subject.OnNext(3); expected1and2 = [1, 2, 3]; Assert.Equal(expected1and2, list1); Assert.Equal(expected1and2, list2); var list3 = new List(); source.Subscribe(list3.Add); List expected3 = []; Assert.Equal(expected1and2, list1); Assert.Equal(expected1and2, list2); Assert.Equal(expected3, list3); subject.OnNext(4); expected1and2 = [1, 2, 3, 4]; expected3 = [4]; Assert.Equal(expected1and2, list1); Assert.Equal(expected1and2, list2); Assert.Equal(expected3, list3); } [TestMethod] public void RefCount_NoDelay_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletion() { var seen = 0; var terminated = false; // On initial subscription, the source will produce one value and will not complete. var connectable = new SerialSingleNotificationConnectable(Notification.CreateOnNext(36)); var refCount = connectable.RefCount(); using (refCount.Subscribe(value => seen = value, () => terminated = true)) { Assert.Equal(36, seen); } seen = 0; terminated = false; // This time around, the source will complete when subscribed to. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted()); using (refCount.Subscribe(value => seen = value, () => terminated = true)) { Assert.Equal(0, seen); Assert.True(terminated); } seen = 0; terminated = false; // Now we go back to the initial behaviour in which the source produces one value and does not complete. connectable.SetNotificationForNextConnect(Notification.CreateOnNext(42)); using (refCount.Subscribe(value => seen = value, () => terminated = true)) { Assert.Equal(42, seen); Assert.False(terminated); } } [TestMethod] public void RefCount_NoDelay_minObservers_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletion() { var seen1 = 0; var seen2 = 0; var terminated1 = false; var terminated2 = false; // On initial subscription, the source will produce one value and will not complete. var connectable = new SerialSingleNotificationConnectable(Notification.CreateOnNext(36)); var refCount = connectable.RefCount(2); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.Equal(0, seen1); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(36, seen1); Assert.Equal(36, seen2); } } seen1 = seen2 = 0; terminated1 = terminated2 = false; // This time around, the source will complete when subscribed to. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted()); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.False(terminated1); Assert.False(terminated2); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(0, seen1); Assert.Equal(0, seen2); Assert.True(terminated1); Assert.True(terminated2); } } seen1 = seen2 = 0; terminated1 = terminated2 = false; // Now we go back to the initial behaviour in which the source produces one value and does not complete. connectable.SetNotificationForNextConnect(Notification.CreateOnNext(42)); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.Equal(0, seen1); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(42, seen1); Assert.Equal(42, seen2); Assert.False(terminated1); Assert.False(terminated2); } } } #endregion #region Delayed Disconnect [TestMethod] public void RefCount_DelayedDisconnect_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.RefCount(null, TimeSpan.FromSeconds(2))); ReactiveAssert.Throws(() => Observable.RefCount(null, TimeSpan.FromSeconds(2), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.RefCount(null, 2, TimeSpan.FromSeconds(2))); ReactiveAssert.Throws(() => Observable.RefCount(null, 2, TimeSpan.FromSeconds(2))); ReactiveAssert.Throws(() => Observable.RefCount(Observable.Never().Publish(), TimeSpan.FromSeconds(2), null)); ReactiveAssert.Throws(() => Observable.RefCount(Observable.Never().Publish(), 0, TimeSpan.FromSeconds(2))); ReactiveAssert.Throws(() => Observable.RefCount(Observable.Never().Publish(), -1, TimeSpan.FromSeconds(2))); ReactiveAssert.Throws(() => Observable.RefCount(Observable.Never().Publish(), 2, TimeSpan.FromSeconds(2), null)); ReactiveAssert.Throws(() => Observable.RefCount(Observable.Never().Publish(), 0, TimeSpan.FromSeconds(2), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.RefCount(Observable.Never().Publish(), -1, TimeSpan.FromSeconds(2), Scheduler.Default)); } [TestMethod] public void RefCount_DelayedDisconnect_ConnectsOnFirst() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted(250) ); var subject = new MySubject(); var conn = new ConnectableObservable(xs, subject); var res = scheduler.Start(() => conn.RefCount(TimeSpan.FromSeconds(2)) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted(250) ); Assert.True(subject.Disposed); } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_ConnectsOnObserverThresholdReached() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted(250) ); var subject = new MySubject(); var conn = new ConnectableObservable(xs, subject); var res = conn.RefCount(2, TimeSpan.FromTicks(300)); var d1 = default(IDisposable); var o1 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(210, () => { d1 = res.Subscribe(o1); }); var d2 = default(IDisposable); var o2 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); }); scheduler.Start(); o1.Messages.AssertEqual( OnNext(230, 3), OnNext(240, 4), OnCompleted(250) ); Assert.True(subject.Disposed); } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_SourceProducesValuesAndCompletesInSubscribe() { var connected = 0; var source = Observable.Defer(() => { connected++; return Observable.Range(1, 5); }) .Publish() .RefCount(2, TimeSpan.FromMinutes(1)); Assert.Equal(0, connected); var list1 = new List(); source.Subscribe(list1.Add); Assert.Equal(0, connected); Assert.Empty(list1); var list2 = new List(); source.Subscribe(list2.Add); Assert.Equal(1, connected); var expected = new List([1, 2, 3, 4, 5]); Assert.Equal(expected, list1); Assert.Equal(expected, list2); } [TestMethod] public void RefCount_DelayedDisconnect_SourceCompletesWithNoValuesInSubscribe() { var subscribed = 0; var unsubscribed = 0; var o1 = Observable.Create(observer => { subscribed++; observer.OnCompleted(); return Disposable.Create(() => unsubscribed++); }); var o2 = o1.Publish().RefCount(TimeSpan.FromSeconds(20)); var s1 = o2.Subscribe(); Assert.Equal(1, subscribed); Assert.Equal(1, unsubscribed); var s2 = o2.Subscribe(); Assert.Equal(1, subscribed); Assert.Equal(1, unsubscribed); } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_SourceCompletesWithNoValuesInSubscribe() { var scheduler = new TestScheduler(); var subscribed = 0; var unsubscribed = 0; var o1 = Observable.Create(observer => { subscribed++; observer.OnCompleted(); return Disposable.Create(() => unsubscribed++); }); var o2 = o1.Publish().RefCount(2, TimeSpan.FromTicks(10), scheduler); var s1 = o2.Subscribe(); Assert.Equal(0, subscribed); Assert.Equal(0, unsubscribed); // Note that although we've got a delayed disconnect, we don't need to call AdvanceBy // here because the source itself completes. The disconnect is triggered by the source, // not the RefCount in this test. var s2 = o2.Subscribe(); Assert.Equal(1, subscribed); Assert.Equal(1, unsubscribed); s1.Dispose(); s2.Dispose(); // At this point, the RefCount has 0 subscribers, and will have disconnected from // its source. When we add a new subscriber, the count will be at 0, which is below // minObservers, so we don't expect a new connection. RefCount _will_ call Subscribe // on its source, but that source is the Subject created by Publish(). And since // o1 already delivered an OnComplete, that Subject is now in a completed state, so // it will immediately complete any further subscriptions. RefCount sees this, so // although the connection count briefly goes up to 1, it will then go back down to // 0 before this call to Subscribe returns. // Basically, because this test uses o1.Publish(), once our connectable source source // completes is it incapable of restarting. That's why we have other tests that use // SerialSingleNotificationConnectable - that enables us to build a source that resets var s3 = o2.Subscribe(); Assert.Equal(1, subscribed); Assert.Equal(1, unsubscribed); // While it might look like adding a second subscriber should tip us back over the threshold // and trigger a reconnect, for the reasons described above o2 immediately completed in the // last call to subscribe, so the RefCount is zero at this point. This is a limitation of // Publish(). It doesn't really matter for this test, but it's why some tests use // SerialSingleNotificationConnectable. var s4 = o2.Subscribe(); Assert.Equal(1, subscribed); Assert.Equal(1, unsubscribed); } [TestMethod] public void RefCount_DelayedDisconnect_NotConnected() { var scheduler = new TestScheduler(); var disconnected = false; var count = 0; var xs = Observable.Defer(() => { count++; return Observable.Create(obs => { return () => { disconnected = true; }; }); }); var subject = new MySubject(); var conn = new ConnectableObservable(xs, subject); var refd = conn.RefCount(TimeSpan.FromTicks(20), scheduler); var dis1 = refd.Subscribe(); Assert.Equal(1, count); Assert.Equal(1, subject.SubscribeCount); Assert.False(disconnected); var dis2 = refd.Subscribe(); Assert.Equal(1, count); Assert.Equal(2, subject.SubscribeCount); Assert.False(disconnected); dis1.Dispose(); Assert.False(disconnected); dis2.Dispose(); Assert.False(disconnected); scheduler.AdvanceBy(19); Assert.False(disconnected); scheduler.AdvanceBy(1); Assert.True(disconnected); disconnected = false; var dis3 = refd.Subscribe(); Assert.Equal(2, count); Assert.Equal(3, subject.SubscribeCount); Assert.False(disconnected); dis3.Dispose(); scheduler.AdvanceBy(20); Assert.True(disconnected); } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_NotConnected() { var connected = 0; var source = Observable.Defer(() => { connected++; return Observable.Never(); }) .Publish() .RefCount(2, TimeSpan.FromMinutes(1)); Assert.Equal(0, connected); source.Subscribe(); Assert.Equal(0, connected); } [TestMethod] public void RefCount_DelayedDisconnect_OnError() { var ex = new Exception(); var xs = Observable.Throw(ex, Scheduler.Immediate); var res = xs.Publish().RefCount(TimeSpan.FromSeconds(2)); res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception()); res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception()); } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_OnError() { var ex = new Exception(); var xs = Observable.Throw(ex, Scheduler.Immediate); var res = xs.Publish().RefCount(2, TimeSpan.FromSeconds(200)); var exceptionsReceived = new List(); void AddSubscriber() { res.Subscribe( _ => { Assert.Fail("OnNext unexpected"); }, ex_ => { exceptionsReceived.Add(ex); }, () => { Assert.Fail("OnComplete unexpected"); }); } AddSubscriber(); Assert.Equal(0, exceptionsReceived.Count); AddSubscriber(); Assert.Equal(2, exceptionsReceived.Count); Assert.Same(ex, exceptionsReceived[0]); Assert.Same(ex, exceptionsReceived[1]); } [TestMethod] public void RefCount_DelayedDisconnect_HotSourceMultipleSubscribers() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7), OnNext(280, 8), OnNext(290, 9), OnCompleted(300) ); var res = xs.Publish().RefCount(TimeSpan.FromTicks(9), scheduler); var d1 = default(IDisposable); var o1 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); }); scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); }); var d2 = default(IDisposable); var o2 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); }); scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); }); var d3 = default(IDisposable); var o3 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); }); scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); }); var d4 = default(IDisposable); var o4 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); }); scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); }); scheduler.Start(); o1.Messages.AssertEqual( OnNext(220, 2), OnNext(230, 3) ); o2.Messages.AssertEqual( OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7) ); o3.Messages.AssertEqual( OnNext(260, 6) ); o4.Messages.AssertEqual( OnNext(290, 9), OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(215, 284), Subscribe(285, 300) ); } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_HotSourceMultipleSubscribers() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), // 0 subscribers OnNext(220, 2), // 1 subscriber OnNext(230, 3), // 2 subscribers OnNext(240, 4), // 1 subscriber OnNext(250, 5), // 1 subscriber OnNext(260, 6), // 2 subscribers OnNext(270, 7), // 1 subscribers OnNext(280, 8), // 0 subscribers OnNext(290, 9), // 1 subscribers OnNext(300, 10), // 2 subscribers OnCompleted(310) ); var res = xs.Publish().RefCount(2, TimeSpan.FromTicks(9), scheduler); var d1 = default(IDisposable); var o1 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); }); scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); }); var d2 = default(IDisposable); var o2 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); }); scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); }); var d3 = default(IDisposable); var o3 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); }); scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); }); var d4 = default(IDisposable); var o4 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); }); scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); }); var d5 = default(IDisposable); var o5 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(295, () => { d5 = res.Subscribe(o5); }); scheduler.ScheduleAbsolute(320, () => { d5.Dispose(); }); scheduler.Start(); o1.Messages.AssertEqual( OnNext(230, 3) ); o2.Messages.AssertEqual( OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7) ); o3.Messages.AssertEqual( OnNext(260, 6) ); o4.Messages.AssertEqual( OnNext(300, 10), OnCompleted(310) ); o5.Messages.AssertEqual( OnNext(300, 10), OnCompleted(310) ); xs.Subscriptions.AssertEqual( Subscribe(225, 284), Subscribe(295, 310) ); } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_SubscriptionsDropBelowThresholdButNotToZero() { var subject = new ReplaySubject(5); var connected = 0; var source = Observable.Defer(() => { connected++; return subject; }) .Publish() .RefCount(2, TimeSpan.FromMinutes(1)); subject.OnNext(1); Assert.Equal(0, connected); var list1 = new List(); var sub1 = source.Subscribe(list1.Add); Assert.Equal(0, connected); Assert.Empty(list1); subject.OnNext(2); var list2 = new List(); var sub2 = source.Subscribe(list2.Add); // Since connection only occurred with the 2nd subscriber, we expect both to get everything // the ReplaySubject has stored. List expectedSub1 = [1, 2]; var expectedSub2 = expectedSub1; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub1, list2); Assert.Equal(1, connected); subject.OnNext(3); // Both subscribers should have received the new item. expectedSub1 = expectedSub2 = [1, 2, 3]; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(1, connected); var list3 = new List(); source.Subscribe(list3.Add); // Since we were already connected, the 3rd subscriber just gets added to the observers of // the Publish multicast output, and no new connection should occur to the underlying ReplaySubject. // So for this 3rd subscription, no new items should be received by any of the subscribers List expectedSub3 = []; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); Assert.Equal(1, connected); subject.OnNext(4); // All the current subscribers should have received that latest item. expectedSub1 = expectedSub2 = [1, 2, 3, 4]; expectedSub3 = [4]; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); Assert.Equal(1, connected); sub1.Dispose(); subject.OnNext(5); // The two remaining subscribers should have received that new item, but the one that just // unsubscribed should not. expectedSub1 = [1, 2, 3, 4]; expectedSub2 = [1, 2, 3, 4, 5]; expectedSub3 = [4, 5]; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); Assert.Equal(1, connected); sub2.Dispose(); subject.OnNext(6); // We are now below the minObservers threshold of 2, but that threshold only governs when we move // from a disconnected state to a connected state. We should remain connected as long as there is // at least one subscriber, so we expect the remaining subscriber to receive that last item. expectedSub1 = [1, 2, 3, 4]; expectedSub2 = [1, 2, 3, 4, 5]; expectedSub3 = [4, 5, 6]; Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); Assert.Equal(1, connected); } [TestMethod] public void RefCount_DelayedDisconnect_SubscriptionsDropBelowThresholdAndThenBackAbove() { var scheduler = new TestScheduler(); var sourceAfterInitial = new Subject(); var connected = 0; var source = Observable.Defer(() => { connected++; return Observable.Range(1, 5).Concat(sourceAfterInitial); }) .Publish() .RefCount(2, TimeSpan.FromTicks(10), scheduler); Assert.Equal(0, connected); var list1 = new List(); var sub1 = source.Subscribe(list1.Add); // 1 subscriber Assert.Equal(0, connected); Assert.Empty(list1); var list2 = new List(); var sub2 = source.Subscribe(list2.Add); // 2 subscribers Assert.Equal(1, connected); sourceAfterInitial.OnNext(6); sub1.Dispose(); // 1 subscriber // We don't expect a disconnect, but provide enough time for one to occur, should that bug ever creep in scheduler.AdvanceBy(10); Assert.Equal(1, connected); sourceAfterInitial.OnNext(7); Assert.Equal(1, connected); var list3 = new List(); var sub3 = source.Subscribe(list3.Add); // This is the distinguishing feature of this test. With that last subscription, we went from 1 // subscriber (below minObservers) but still connected (because we already hit minObservers once // and never dropped to zero), and now we're passing through minObservers again. We used to have // a bug where we would erroneously attempt to reconnect at this point. Assert.Equal(1, connected); sourceAfterInitial.OnNext(8); var expectedSub1 = new List([1, 2, 3, 4, 5, 6]); var expectedSub2 = new List([1, 2, 3, 4, 5, 6, 7, 8]); var expectedSub3 = new List([8]); Assert.Equal(expectedSub1, list1); Assert.Equal(expectedSub2, list2); Assert.Equal(expectedSub3, list3); } [TestMethod] public void RefCount_DelayedDisconnect_SubscriptionsDropToZeroThenNewSubscriptionArrivesBeforeDisconnectDelay() { var scheduler = new TestScheduler(); var source = new SerialSingleNotificationConnectable(Notification.CreateOnNext(1)); var rco = source.RefCount(TimeSpan.FromTicks(10), scheduler); var s1 = rco.Subscribe(); s1.Dispose(); // There are now 0 subscribers, but the time for the disconnect has not yet come. Assert.Equal(1, source.Connections.Count); Assert.False(source.Connections[0].Disposed); scheduler.AdvanceBy(9); // The time has still not come, Assert.Equal(1, source.Connections.Count); Assert.False(source.Connections[0].Disposed); // Since we were still connected, this should move the connection from a 'waiting to // shut down' state into an active state. var seen = 0; var terminated = false; var s2 = rco.Subscribe(x => seen = x, () => terminated = true); source.DeliverNotificationForActiveConnection(Notification.CreateOnNext(2)); Assert.Equal(2, seen); Assert.False(terminated); Assert.False(source.Connections[0].Disposed); // This moves us past the time when `RefCount` would have shut down the connection if no new // subscriptions had turned up. scheduler.AdvanceBy(2); Assert.False(terminated); Assert.False(source.Connections[0].Disposed); // We should be able to advance well beyond the disconnect delay because we have an active // subscriber. scheduler.AdvanceBy(20); Assert.False(terminated); Assert.False(source.Connections[0].Disposed); } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_SubscriptionsDropToZeroThenNewSubscriptionArrivesBeforeDisconnectDelay() { var scheduler = new TestScheduler(); var source = new SerialSingleNotificationConnectable(Notification.CreateOnNext(1)); var rco = source.RefCount(2, TimeSpan.FromTicks(10), scheduler); var s1 = rco.Subscribe(); var s2 = rco.Subscribe(); s1.Dispose(); s2.Dispose(); // There are now 0 subscribers, but the time for the disconnect has not yet come. Assert.Equal(1, source.Connections.Count); Assert.False(source.Connections[0].Disposed); scheduler.AdvanceBy(9); // The time has still not come, Assert.Equal(1, source.Connections.Count); Assert.False(source.Connections[0].Disposed); // Since we were still connected, this should move the connection from a 'waiting to // shut down' state into an active state. (We're below the minObservers threshold, but // that just determines when Connect is called. RefCount has historically always waited // for the subscription count to reach 0 before disconnecting, so if that count goes // above 0 while we were waiting for the disconnect delay, it should return to an // active state.) var seen = 0; var terminated = false; var s3 = rco.Subscribe(x => seen = x, () => terminated = true); source.DeliverNotificationForActiveConnection(Notification.CreateOnNext(2)); Assert.Equal(2, seen); Assert.False(terminated); Assert.False(source.Connections[0].Disposed); // This moves us past the time when `RefCount` would have shut down the connection if // no new subscriptions had turned up. The arrival of a new subscriber should ensure // that we remain connected. scheduler.AdvanceBy(2); Assert.False(terminated); Assert.False(source.Connections[0].Disposed); // We should be able to advance well beyond the disconnect delay because we have an active // subscriber. scheduler.AdvanceBy(20); Assert.False(terminated); Assert.False(source.Connections[0].Disposed); } [TestMethod] public void RefCount_DelayedDisconnect_ValuesDuringAndAfterSubscribe() { var subject = new ReplaySubject(5); var source = subject.Publish().RefCount(TimeSpan.FromSeconds(20)); subject.OnNext(1); // Although the source is a ReplaySubject, the use of Publish means there will only be // a single subscription to the ReplaySubject, so it will only replay one. (It will replay // that first value on the initial connect.) So we expect each subscriber to see fewer and // fewer values. // all subscribers will see all the values List expected1 = [1]; var list1 = new List(); source.Subscribe(list1.Add); Assert.Equal(expected1, list1); subject.OnNext(2); var list2 = new List(); source.Subscribe(list2.Add); expected1 = [1, 2]; List expected2 = []; Assert.Equal(expected1, list1); Assert.Equal(expected2, list2); subject.OnNext(3); var list3 = new List(); source.Subscribe(list3.Add); expected1 = [1, 2, 3]; expected2 = [3]; List expected3 = []; Assert.Equal(expected1, list1); Assert.Equal(expected2, list2); Assert.Equal(expected3, list3); subject.OnNext(4); expected1 = [1, 2, 3, 4]; expected2 = [3, 4]; expected3 = [4]; Assert.Equal(expected1, list1); Assert.Equal(expected2, list2); Assert.Equal(expected3, list3); } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_ValuesDuringAndAfterSubscribe() { var subject = new ReplaySubject(5); var source = subject.Publish().RefCount(2, TimeSpan.FromSeconds(20)); subject.OnNext(1); var list1 = new List(); source.Subscribe(list1.Add); Assert.Empty(list1); subject.OnNext(2); List expected1and2 = [1, 2]; var list2 = new List(); source.Subscribe(list2.Add); Assert.Equal(expected1and2, list1); Assert.Equal(expected1and2, list2); subject.OnNext(3); expected1and2 = [1, 2, 3]; Assert.Equal(expected1and2, list1); Assert.Equal(expected1and2, list2); var list3 = new List(); source.Subscribe(list3.Add); List expected3 = []; Assert.Equal(expected1and2, list1); Assert.Equal(expected1and2, list2); Assert.Equal(expected3, list3); subject.OnNext(4); expected1and2 = [1, 2, 3, 4]; expected3 = [4]; Assert.Equal(expected1and2, list1); Assert.Equal(expected1and2, list2); Assert.Equal(expected3, list3); } [TestMethod] [DataRow(true)] [DataRow(false)] public void RefCount_DelayedDisconnect_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletion( bool reSubscribeBeforeDelayedDisconnect) { var scheduler = new TestScheduler(); var seen = 0; var terminated = false; // On initial subscription, the source will produce one value and will not complete. var connectable = new SerialSingleNotificationConnectable(Notification.CreateOnNext(36)); var refCount = connectable.RefCount(TimeSpan.FromTicks(10), scheduler); using (refCount.Subscribe(value => seen = value, () => terminated = true)) { Assert.Equal(36, seen); Assert.Equal(1, connectable.Connections.Count); Assert.False(connectable.Connections[0].Disposed); } Assert.False(connectable.Connections[0].Disposed); // For these initial subscriptions, we allow enough time for the delayed disconnect to occur even if // reSubscribeBeforeDelayedDisconnect is false, because it's the resubscription after a source-induced // completion that this test is interested in. scheduler.AdvanceBy(11); Assert.Equal(1, connectable.Connections.Count); Assert.True(connectable.Connections[0].Disposed); seen = 0; terminated = false; // This time around, when Connect is called, all subscriptions after the preceding Connect will be // completed. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted()); using (refCount.Subscribe(value => seen = value, () => terminated = true)) { Assert.Equal(0, seen); Assert.True(terminated); Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); } Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); scheduler.AdvanceBy(reSubscribeBeforeDelayedDisconnect ? 1 : 11); Assert.Equal(2, connectable.Connections.Count); Assert.Equal(!reSubscribeBeforeDelayedDisconnect, connectable.Connections[1].Disposed); seen = 0; terminated = false; // Now we go back to the initial behaviour in which the source produces one value and does not complete. connectable.SetNotificationForNextConnect(Notification.CreateOnNext(42)); using (refCount.Subscribe(value => seen = value, () => terminated = true)) { Assert.Equal(reSubscribeBeforeDelayedDisconnect ? 0 : 42, seen); Assert.Equal(reSubscribeBeforeDelayedDisconnect, terminated); Assert.Equal(reSubscribeBeforeDelayedDisconnect ? 2 : 3, connectable.Connections.Count); Assert.False(connectable.Connections[reSubscribeBeforeDelayedDisconnect ? 1 : 2].Disposed); } } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletionAndEnoughTimeForDisconnectHasPassed() { var scheduler = new TestScheduler(); var seen1 = 0; var seen2 = 0; var terminated1 = false; var terminated2 = false; // On initial subscription, the source will produce one value and will not complete. var connectable = new SerialSingleNotificationConnectable(Notification.CreateOnNext(36)); var refCount = connectable.RefCount(2, TimeSpan.FromTicks(10), scheduler); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.Equal(0, seen1); Assert.Empty(connectable.Connections); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(36, seen1); Assert.Equal(36, seen2); Assert.Equal(1, connectable.Connections.Count); Assert.False(connectable.Connections[0].Disposed); } } Assert.Equal(1, connectable.Connections.Count); Assert.False(connectable.Connections[0].Disposed); scheduler.AdvanceBy(11); Assert.Equal(1, connectable.Connections.Count); Assert.True(connectable.Connections[0].Disposed); seen1 = seen2 = 0; terminated1 = terminated2 = false; // This time around, when Connect is called, all subscriptions after the preceding Connect will be // completed. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted()); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.Equal(1, connectable.Connections.Count); Assert.False(terminated1); Assert.False(terminated2); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(0, seen1); Assert.Equal(0, seen2); Assert.True(terminated1); Assert.True(terminated2); Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); } } Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); scheduler.AdvanceBy(11); Assert.Equal(2, connectable.Connections.Count); Assert.True(connectable.Connections[1].Disposed); seen1 = seen2 = 0; terminated1 = terminated2 = false; // Now we go back to the initial behaviour in which the source produces one value and does not complete. connectable.SetNotificationForNextConnect(Notification.CreateOnNext(42)); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.False(terminated1); Assert.Equal(0, seen1); Assert.False(terminated2); Assert.Equal(2, connectable.Connections.Count); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(42, seen1); Assert.Equal(42, seen2); Assert.False(terminated1); Assert.False(terminated2); Assert.Equal(3, connectable.Connections.Count); Assert.False(connectable.Connections[2].Disposed); } } } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletionAndEnoughTimeForDisconnectHasPassed_WithPreConnectNotifications() { var scheduler = new TestScheduler(); var seen1 = 0; var seen2 = 0; var terminated1 = false; var terminated2 = false; // On initial subscription, the source will produce one value and will not complete. var connectable = new SerialConnectableIgnoringConnect(new BehaviorSubject(36)); var refCount = connectable.RefCount(2, TimeSpan.FromTicks(10), scheduler); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { // The SerialConnectableConnectIgnoringObservable is unusual in that it can produce values before the // call to Connect. So we expect to see the value from the source, but not yet to // have seen a Connect call. Assert.Equal(36, seen1); Assert.Empty(connectable.Connections); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(36, seen1); Assert.Equal(36, seen2); Assert.Equal(1, connectable.Connections.Count); Assert.False(connectable.Connections[0].Disposed); } } Assert.Equal(1, connectable.Connections.Count); Assert.False(connectable.Connections[0].Disposed); scheduler.AdvanceBy(11); Assert.Equal(1, connectable.Connections.Count); Assert.True(connectable.Connections[0].Disposed); seen1 = seen2 = 0; terminated1 = terminated2 = false; // This time around, the source will complete when subscribed to. connectable.SetSource(Observable.Empty()); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { // Again, the SerialConnectableConnectIgnoringObservable's unsual behaviour of // delivering notifications immediately from subscription without waiting for the // Connect means we see the initial termination immediately (and no connection yet). Assert.True(terminated1); Assert.False(terminated2); Assert.Equal(1, connectable.Connections.Count); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(0, seen1); Assert.Equal(0, seen2); Assert.True(terminated1); Assert.True(terminated2); // Since the initial subscription completed immediately, the observer count // never got above 1, so we do not expect a second connection Assert.Equal(1, connectable.Connections.Count); Assert.True(connectable.Connections[0].Disposed); } } Assert.Equal(1, connectable.Connections.Count); scheduler.AdvanceBy(11); Assert.Equal(1, connectable.Connections.Count); seen1 = seen2 = 0; terminated1 = terminated2 = false; // Now we go back to the initial behaviour in which the source produces one value and does not complete. connectable.SetSource(new BehaviorSubject(42)); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.False(terminated1); Assert.Equal(42, seen1); Assert.False(terminated2); Assert.Equal(1, connectable.Connections.Count); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(42, seen1); Assert.Equal(42, seen2); Assert.False(terminated1); Assert.False(terminated2); Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); } } } [TestMethod] public void RefCount_DelayedDisconnect_minObservers_DoesNotConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletionButNotEnoughTimeForDelayedDisconnectHasPassed() { var scheduler = new TestScheduler(); var seen1 = 0; var seen2 = 0; var terminated1 = false; var terminated2 = false; // On initial subscription, the source will produce one value and will not complete. var connectable = new SerialSingleNotificationConnectable(Notification.CreateOnNext(36)); var refCount = connectable.RefCount(2, TimeSpan.FromTicks(10), scheduler); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.Equal(0, seen1); Assert.Empty(connectable.Connections); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(36, seen1); Assert.Equal(36, seen2); Assert.Equal(1, connectable.Connections.Count); Assert.False(connectable.Connections[0].Disposed); } } Assert.Equal(1, connectable.Connections.Count); Assert.False(connectable.Connections[0].Disposed); // For these initial subscriptions, we allow enough time for the delayed disconnect to occur, because // it's the resubscription after a source-induced completion that this test is interested in. scheduler.AdvanceBy(11); Assert.Equal(1, connectable.Connections.Count); Assert.True(connectable.Connections[0].Disposed); seen1 = seen2 = 0; terminated1 = terminated2 = false; // Any further subscriptions will be completed on the next Connect. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted()); using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.Equal(1, connectable.Connections.Count); Assert.False(terminated1); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(0, seen1); Assert.Equal(0, seen2); Assert.True(terminated1); Assert.True(terminated2); Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); } } Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); scheduler.AdvanceBy(5); Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); seen1 = seen2 = 0; terminated1 = terminated2 = false; // To verify that individual subscriptions continue to be forwarded to the underlying source even // when no reconnect occurs, we arrange for subsequent subscriptions to get receive a single value. // (This is a slightly odd thing to do, but it's not RefCount's place to have opinions on how the // source should behave.) connectable.Connections[1].ReplaceSource(new BehaviorSubject(42)); // The connection set up in the preceding section won't be torn down until the // specified disconnect delay has elapsed, so the expected behaviour if we try to establish // new subscriptions in that time is that their Subscribe will be passed through to the source, // and that we won't see any further connections. But now that the further subscriptions to the // source will result in a value (even though earlier subscriptions to the same source have been // completed) we expect these new subscriptions each to see the value. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.False(terminated1); Assert.Equal(42, seen1); Assert.False(terminated2); Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(42, seen1); Assert.Equal(42, seen2); Assert.False(terminated1); Assert.False(terminated2); Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); } } connectable.SetNotificationForNextConnect(Notification.CreateOnNext(99)); // If we advanced by enough for the deferred disconnect to occur, it should be able to create a fresh // connection to the underlying source, at which point we'll see the value again. // We were at 5, so this takes us to 11 since the initial connection, but we don't expect that to be // enough, because the deferred disconnection should be relative to the most recent subscription. scheduler.AdvanceBy(6); Assert.Equal(2, connectable.Connections.Count); Assert.False(connectable.Connections[1].Disposed); // Since the last subscription occurred at 5, advancing to 16 should trigger disconnection. And // since we're already up to 11, this should do it: scheduler.AdvanceBy(5); Assert.Equal(2, connectable.Connections.Count); Assert.True(connectable.Connections[1].Disposed); seen1 = seen2 = 0; terminated1 = terminated2 = false; using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true)) { Assert.Equal(0, seen1); Assert.Equal(2, connectable.Connections.Count); Assert.True(connectable.Connections[1].Disposed); using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true)) { Assert.Equal(99, seen1); Assert.Equal(99, seen2); Assert.False(terminated1); Assert.False(terminated2); Assert.Equal(3, connectable.Connections.Count); Assert.False(connectable.Connections[2].Disposed); } } } #endregion } }