RefCountTest.cs 82 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Disposables;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using Microsoft.Reactive.Testing;
  12. using Microsoft.VisualStudio.TestTools.UnitTesting;
  13. using Assert = Xunit.Assert;
  14. namespace ReactiveTests.Tests
  15. {
  16. [TestClass]
  17. public class RefCountTest : ReactiveTest
  18. {
  19. /// <summary>
  20. /// A connectable observable that provides an individual notification upon connection, where
  21. /// the notification can be different from one connection to the next.
  22. /// </summary>
  23. /// <typeparam name="T">Element type.</typeparam>
  24. /// <remarks>
  25. /// <para>
  26. /// The most important capability this provides is to be able to provide values after
  27. /// having completed. Obviously it won't do that for any single subscription because that
  28. /// would break the basic Rx contract, but this can deliver completion to some subscribers,
  29. /// and then go on to deliver values to subsequent subscribers. (The connectable
  30. /// observables returned by <c>Publish</c> can't do this: once their subject has delivered
  31. /// a completion notification it can't deliver anything else, not even to new subscribers.
  32. /// That's why we need a specialized type.)
  33. /// </para>
  34. /// </remarks>
  35. private sealed class SerialSingleNotificationConnectable<T> : IConnectableObservable<T>
  36. {
  37. private readonly object _gate = new();
  38. private Notification<T> _notificationAtNextConnect;
  39. private Subject<T> _sourceForNextConnect = new();
  40. private Connection _nextConnectionInProgress;
  41. public SerialSingleNotificationConnectable(Notification<T> initialNotificationAtNextConnect)
  42. {
  43. _notificationAtNextConnect = initialNotificationAtNextConnect;
  44. _nextConnectionInProgress = new(_sourceForNextConnect);
  45. }
  46. public List<Connection> Connections { get; } = new();
  47. private Connection ActiveConnection => (Connections.Count > 0 &&
  48. Connections[Connections.Count - 1] is Connection { Disposed: false } activeConnection)
  49. ? activeConnection : null;
  50. private Connection CurrentConnection => ActiveConnection ?? _nextConnectionInProgress;
  51. public void SetNotificationForNextConnect(Notification<T> notification)
  52. {
  53. _notificationAtNextConnect = notification;
  54. }
  55. public void DeliverNotificationForActiveConnection(Notification<T> notification)
  56. {
  57. if (ActiveConnection is not Connection activeConnection)
  58. {
  59. throw new InvalidOperationException("No connection is currently active");
  60. }
  61. if (activeConnection.Source is not Subject<T> source)
  62. {
  63. throw new InvalidOperationException("Active connection's source has been replaced and is no longer a Subject<T>, so it is not possible to deliver further notifications to current subscribers");
  64. }
  65. notification.Accept(source);
  66. }
  67. public IDisposable Connect()
  68. {
  69. Connection connecting;
  70. Notification<T> notification;
  71. Subject<T> source;
  72. lock (_gate)
  73. {
  74. connecting = _nextConnectionInProgress;
  75. notification = _notificationAtNextConnect;
  76. source = _sourceForNextConnect;
  77. _sourceForNextConnect = new Subject<T>();
  78. _nextConnectionInProgress = new(_sourceForNextConnect);
  79. Connections.Add(connecting);
  80. }
  81. notification.Accept(source);
  82. return connecting;
  83. }
  84. public IDisposable Subscribe(IObserver<T> observer)
  85. {
  86. Connection connection;
  87. lock (_gate)
  88. {
  89. connection = CurrentConnection;
  90. }
  91. return connection.Source.Subscribe(observer);
  92. }
  93. public sealed class Connection(IObservable<T> source) : IDisposable
  94. {
  95. /// <summary>
  96. /// Gets a value indicating whether this connection has been disposed.
  97. /// </summary>
  98. public bool Disposed { get; private set; }
  99. public IObservable<T> Source { get; private set; } = source;
  100. /// <summary>
  101. /// In scenarios where <see cref="Source"/> has entered a completed state, this
  102. /// replaces it with a new source so if further subscribers to the same connection
  103. /// come along, tests can deliver notifications to those.
  104. /// </summary>
  105. /// <remarks>
  106. /// Without this method, <see cref="SerialSingleNotificationConnectable{T}"/> will
  107. /// deliver events only when <see cref="Connect"/> is called, meaning that only
  108. /// observers that subscribed before that call will receive any notifications
  109. /// (unless the notification was <c>OnComplete</c>, in which case the subject
  110. /// enters a completed state, and completes all further subscribers). This enables
  111. /// tests to create scenarios where subscriptions made after <c>Connect</c> (and
  112. /// before that connection is disposed) can receive further notifications.
  113. /// </remarks>
  114. public void ReplaceSource(IObservable<T> source)
  115. {
  116. Source = source;
  117. }
  118. public void Dispose()
  119. {
  120. Disposed = true;
  121. }
  122. }
  123. }
  124. /// <summary>
  125. /// A connectable observable that logs calls to <see cref="Connect"/> but otherwise ignores
  126. /// them, forwarding <see cref="Subscribe"/> calls to the current underlying source (which
  127. /// can be changed over time).
  128. /// </summary>
  129. /// <typeparam name="T">Element type.</typeparam>
  130. /// <remarks>
  131. /// <para>
  132. /// This is similar to <see cref="SerialSingleNotificationConnectable{T}"/>, in that the
  133. /// underlying source can be changed over time, making it possible for this to complete
  134. /// observers, but then revert to a state where subsequent observers will not be completed.
  135. /// But this also enables simulation of unusual (but not strictly disallowed) behaviour,
  136. /// in which subscribers will receive notifications before calling <see cref="Connect"/>.
  137. /// It's useful to be able to do this because it can happen in more normal setups when
  138. /// sources completed synchronously, and it's easy to handle this incorrectly.
  139. /// </para>
  140. /// </remarks>
  141. private sealed class SerialConnectableIgnoringConnect<T> : IConnectableObservable<T>
  142. {
  143. private IObservable<T> _source;
  144. public SerialConnectableIgnoringConnect(IObservable<T> initialSource)
  145. {
  146. _source = initialSource;
  147. }
  148. public void SetSource(IObservable<T> source)
  149. {
  150. _source = source;
  151. }
  152. public List<Connection> Connections { get; } = new();
  153. public IDisposable Connect()
  154. {
  155. var connection = new Connection();
  156. Connections.Add(connection);
  157. return connection;
  158. }
  159. public IDisposable Subscribe(IObserver<T> observer)
  160. {
  161. return _source.Subscribe(observer);
  162. }
  163. public sealed class Connection() : IDisposable
  164. {
  165. /// <summary>
  166. /// Gets a value indicating whether this connection has been disposed.
  167. /// </summary>
  168. public bool Disposed { get; private set; }
  169. public void Dispose()
  170. {
  171. Disposed = true;
  172. }
  173. }
  174. }
  175. #region Immediate Disconnect
  176. [TestMethod]
  177. public void RefCount_NoDelay_ArgumentChecking()
  178. {
  179. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null));
  180. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, 2));
  181. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 0));
  182. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -1));
  183. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -2));
  184. }
  185. [TestMethod]
  186. public void RefCount_NoDelay_ConnectsOnFirst()
  187. {
  188. var scheduler = new TestScheduler();
  189. var xs = scheduler.CreateHotObservable(
  190. OnNext(210, 1),
  191. OnNext(220, 2),
  192. OnNext(230, 3),
  193. OnNext(240, 4),
  194. OnCompleted<int>(250)
  195. );
  196. var subject = new MySubject();
  197. var conn = new ConnectableObservable<int>(xs, subject);
  198. var res = scheduler.Start(() =>
  199. conn.RefCount()
  200. );
  201. res.Messages.AssertEqual(
  202. OnNext(210, 1),
  203. OnNext(220, 2),
  204. OnNext(230, 3),
  205. OnNext(240, 4),
  206. OnCompleted<int>(250)
  207. );
  208. Assert.True(subject.Disposed);
  209. }
  210. [TestMethod]
  211. public void RefCount_NoDelay_minObservers_ConnectsOnObserverThresholdReached()
  212. {
  213. var scheduler = new TestScheduler();
  214. var xs = scheduler.CreateHotObservable(
  215. OnNext(210, 1),
  216. OnNext(220, 2),
  217. OnNext(230, 3),
  218. OnNext(240, 4),
  219. OnCompleted<int>(250)
  220. );
  221. var subject = new MySubject();
  222. var conn = new ConnectableObservable<int>(xs, subject);
  223. var res = conn.RefCount(2);
  224. var d1 = default(IDisposable);
  225. var o1 = scheduler.CreateObserver<int>();
  226. scheduler.ScheduleAbsolute(205, () => { d1 = res.Subscribe(o1); });
  227. var d2 = default(IDisposable);
  228. var o2 = scheduler.CreateObserver<int>();
  229. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  230. scheduler.Start();
  231. o1.Messages.AssertEqual(
  232. OnNext(230, 3),
  233. OnNext(240, 4),
  234. OnCompleted<int>(250)
  235. );
  236. Assert.True(subject.Disposed);
  237. }
  238. [TestMethod]
  239. public void RefCount_NoDelay_SourceProducesValuesAndCompletesInConnect()
  240. {
  241. var connected = 0;
  242. var source = Observable.Defer(() =>
  243. {
  244. connected++;
  245. return Observable.Range(1, 5);
  246. })
  247. .Publish()
  248. .RefCount();
  249. Assert.Equal(0, connected);
  250. var list1 = new List<int>();
  251. source.Subscribe(list1.Add);
  252. Assert.Equal(1, connected);
  253. List<int> expected1 = [1, 2, 3, 4, 5];
  254. Assert.Equal(expected1, list1);
  255. var list2 = new List<int>();
  256. source.Subscribe(list2.Add);
  257. Assert.Equal(1, connected);
  258. Assert.Empty(list2);
  259. }
  260. [TestMethod]
  261. public void RefCount_NoDelay_minObservers_SourceProducesValuesAndCompletesInConnect()
  262. {
  263. var connected = 0;
  264. var source = Observable.Defer(() =>
  265. {
  266. connected++;
  267. return Observable.Range(1, 5);
  268. })
  269. .Publish()
  270. .RefCount(2);
  271. Assert.Equal(0, connected);
  272. var list1 = new List<int>();
  273. source.Subscribe(list1.Add);
  274. Assert.Equal(0, connected);
  275. Assert.Empty(list1);
  276. var list2 = new List<int>();
  277. source.Subscribe(list2.Add);
  278. Assert.Equal(1, connected);
  279. List<int> expected = [1, 2, 3, 4, 5];
  280. Assert.Equal(expected, list1);
  281. Assert.Equal(expected, list2);
  282. }
  283. [TestMethod]
  284. public void RefCount_NoDelay_SourceCompletesWithNoValuesInConnect()
  285. {
  286. var connectable = new SerialSingleNotificationConnectable<int>(Notification.CreateOnCompleted<int>());
  287. var refCount = connectable.RefCount();
  288. var s1 = refCount.Subscribe();
  289. Assert.Equal(1, connectable.Connections.Count);
  290. // Since the source immediately completed, the RefCount goes back to zero subscribers
  291. // inside the call to Connect, so we expect to be disconnected.
  292. Assert.True(connectable.Connections[0].Disposed);
  293. var s2 = refCount.Subscribe();
  294. Assert.Equal(2, connectable.Connections.Count);
  295. Assert.True(connectable.Connections[1].Disposed);
  296. }
  297. [TestMethod]
  298. public void RefCount_NoDelay_minObservers_SourceCompletesWithNoValuesInConnect()
  299. {
  300. var connectable = new SerialSingleNotificationConnectable<int>(Notification.CreateOnCompleted<int>());
  301. var refCount = connectable.RefCount(2);
  302. var s1 = refCount.Subscribe();
  303. Assert.Equal(0, connectable.Connections.Count);
  304. var s2 = refCount.Subscribe();
  305. Assert.Equal(1, connectable.Connections.Count);
  306. // Since the source completes immediately, we will have no active subscribers, so
  307. // we expect to be disconnected.
  308. Assert.True(connectable.Connections[0].Disposed);
  309. s1.Dispose();
  310. s2.Dispose();
  311. // Disposing subscriptions should change nothing because they self-completed.
  312. Assert.Equal(1, connectable.Connections.Count);
  313. // We're now back in the initial disconnected state, so nothing more should
  314. // happen until we get up to minObservers.
  315. var s3 = refCount.Subscribe();
  316. Assert.Equal(1, connectable.Connections.Count);
  317. var s4 = refCount.Subscribe();
  318. Assert.Equal(2, connectable.Connections.Count);
  319. Assert.True(connectable.Connections[1].Disposed);
  320. }
  321. [TestMethod]
  322. public void RefCount_NoDelay_NotConnected()
  323. {
  324. var disconnected = false;
  325. var count = 0;
  326. var xs = Observable.Defer(() =>
  327. {
  328. count++;
  329. return Observable.Create<int>(obs =>
  330. {
  331. return () => { disconnected = true; };
  332. });
  333. });
  334. var subject = new MySubject();
  335. var conn = new ConnectableObservable<int>(xs, subject);
  336. var refd = conn.RefCount();
  337. var dis1 = refd.Subscribe();
  338. Assert.Equal(1, count);
  339. Assert.Equal(1, subject.SubscribeCount);
  340. Assert.False(disconnected);
  341. var dis2 = refd.Subscribe();
  342. Assert.Equal(1, count);
  343. Assert.Equal(2, subject.SubscribeCount);
  344. Assert.False(disconnected);
  345. dis1.Dispose();
  346. Assert.False(disconnected);
  347. dis2.Dispose();
  348. Assert.True(disconnected);
  349. disconnected = false;
  350. var dis3 = refd.Subscribe();
  351. Assert.Equal(2, count);
  352. Assert.Equal(3, subject.SubscribeCount);
  353. Assert.False(disconnected);
  354. dis3.Dispose();
  355. Assert.True(disconnected);
  356. }
  357. [TestMethod]
  358. public void RefCount_NoDelay_minObservers_NotConnected()
  359. {
  360. var connected = 0;
  361. var source = Observable.Defer(() =>
  362. {
  363. connected++;
  364. return Observable.Never<int>();
  365. })
  366. .Publish()
  367. .RefCount(2);
  368. Assert.Equal(0, connected);
  369. source.Subscribe();
  370. Assert.Equal(0, connected);
  371. }
  372. [TestMethod]
  373. public void RefCount_NoDelay_OnError()
  374. {
  375. var ex = new Exception();
  376. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  377. var res = xs.Publish().RefCount();
  378. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  379. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  380. }
  381. [TestMethod]
  382. public void RefCount_NoDelay_minObservers_OnError()
  383. {
  384. var ex = new Exception();
  385. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  386. var res = xs.Publish().RefCount(2);
  387. var exceptionsReceived = new List<Exception>();
  388. void AddSubscriber()
  389. {
  390. res.Subscribe(
  391. _ => { Assert.Fail("OnNext unexpected"); },
  392. ex_ => { exceptionsReceived.Add(ex); },
  393. () => { Assert.Fail("OnComplete unexpected"); });
  394. }
  395. AddSubscriber();
  396. Assert.Equal(0, exceptionsReceived.Count);
  397. AddSubscriber();
  398. Assert.Equal(2, exceptionsReceived.Count);
  399. Assert.Same(ex, exceptionsReceived[0]);
  400. Assert.Same(ex, exceptionsReceived[1]);
  401. }
  402. [TestMethod]
  403. public void RefCount_NoDelay_HotSourceMultipleSubscribers()
  404. {
  405. var scheduler = new TestScheduler();
  406. var xs = scheduler.CreateHotObservable(
  407. OnNext(210, 1),
  408. OnNext(220, 2),
  409. OnNext(230, 3),
  410. OnNext(240, 4),
  411. OnNext(250, 5),
  412. OnNext(260, 6),
  413. OnNext(270, 7),
  414. OnNext(280, 8),
  415. OnNext(290, 9),
  416. OnCompleted<int>(300)
  417. );
  418. var res = xs.Publish().RefCount();
  419. var d1 = default(IDisposable);
  420. var o1 = scheduler.CreateObserver<int>();
  421. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  422. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  423. var d2 = default(IDisposable);
  424. var o2 = scheduler.CreateObserver<int>();
  425. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  426. scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); });
  427. var d3 = default(IDisposable);
  428. var o3 = scheduler.CreateObserver<int>();
  429. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  430. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  431. var d4 = default(IDisposable);
  432. var o4 = scheduler.CreateObserver<int>();
  433. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  434. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  435. scheduler.Start();
  436. o1.Messages.AssertEqual(
  437. OnNext(220, 2),
  438. OnNext(230, 3)
  439. );
  440. o2.Messages.AssertEqual(
  441. OnNext(230, 3),
  442. OnNext(240, 4),
  443. OnNext(250, 5),
  444. OnNext(260, 6),
  445. OnNext(270, 7)
  446. );
  447. o3.Messages.AssertEqual(
  448. OnNext(260, 6)
  449. );
  450. o4.Messages.AssertEqual(
  451. OnNext(290, 9),
  452. OnCompleted<int>(300)
  453. );
  454. xs.Subscriptions.AssertEqual(
  455. Subscribe(215, 275),
  456. Subscribe(285, 300)
  457. );
  458. }
  459. [TestMethod]
  460. public void RefCount_NoDelay_minObservers_HotSourceMultipleSubscribers()
  461. {
  462. var scheduler = new TestScheduler();
  463. var xs = scheduler.CreateHotObservable(
  464. OnNext(210, 1), // 0 subscribers
  465. OnNext(220, 2), // 1 subscriber
  466. OnNext(230, 3), // 2 subscribers
  467. OnNext(240, 4), // 1 subscriber
  468. OnNext(250, 5), // 1 subscriber
  469. OnNext(260, 6), // 2 subscribers
  470. OnNext(270, 7), // 1 subscribers
  471. OnNext(280, 8), // 0 subscribers
  472. OnNext(290, 9), // 1 subscribers
  473. OnNext(300, 10), // 2 subscribers
  474. OnCompleted<int>(310)
  475. );
  476. var res = xs.Publish().RefCount(2);
  477. var d1 = default(IDisposable);
  478. var o1 = scheduler.CreateObserver<int>();
  479. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  480. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  481. var d2 = default(IDisposable);
  482. var o2 = scheduler.CreateObserver<int>();
  483. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  484. scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); });
  485. var d3 = default(IDisposable);
  486. var o3 = scheduler.CreateObserver<int>();
  487. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  488. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  489. var d4 = default(IDisposable);
  490. var o4 = scheduler.CreateObserver<int>();
  491. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  492. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  493. var d5 = default(IDisposable);
  494. var o5 = scheduler.CreateObserver<int>();
  495. scheduler.ScheduleAbsolute(295, () => { d5 = res.Subscribe(o5); });
  496. scheduler.ScheduleAbsolute(320, () => { d5.Dispose(); });
  497. scheduler.Start();
  498. o1.Messages.AssertEqual(
  499. OnNext(230, 3)
  500. );
  501. o2.Messages.AssertEqual(
  502. OnNext(230, 3),
  503. OnNext(240, 4),
  504. OnNext(250, 5),
  505. OnNext(260, 6),
  506. OnNext(270, 7)
  507. );
  508. o3.Messages.AssertEqual(
  509. OnNext(260, 6)
  510. );
  511. o4.Messages.AssertEqual(
  512. OnNext(300, 10),
  513. OnCompleted<int>(310)
  514. );
  515. o5.Messages.AssertEqual(
  516. OnNext(300, 10),
  517. OnCompleted<int>(310)
  518. );
  519. xs.Subscriptions.AssertEqual(
  520. Subscribe(225, 275),
  521. Subscribe(295, 310)
  522. );
  523. }
  524. [TestMethod]
  525. public void RefCount_NoDelay_minObservers_SubscriptionsDropBelowThresholdButNotToZero()
  526. {
  527. var subject = new ReplaySubject<int>(5);
  528. var connected = 0;
  529. var source = Observable.Defer(() =>
  530. {
  531. connected++;
  532. return subject;
  533. })
  534. .Publish().RefCount(2);
  535. subject.OnNext(1);
  536. Assert.Equal(0, connected);
  537. var list1 = new List<int>();
  538. var sub1 = source.Subscribe(list1.Add);
  539. Assert.Equal(0, connected);
  540. Assert.Empty(list1);
  541. subject.OnNext(2);
  542. var list2 = new List<int>();
  543. var sub2 = source.Subscribe(list2.Add);
  544. // Since connection only occurred with the 2nd subscriber, we expect both to get everything
  545. // the ReplaySubject has stored.
  546. List<int> expectedSub1 = [1, 2];
  547. var expectedSub2 = expectedSub1;
  548. Assert.Equal(expectedSub1, list1);
  549. Assert.Equal(expectedSub1, list2);
  550. Assert.Equal(1, connected);
  551. subject.OnNext(3);
  552. // Both subscribers should have received the new item.
  553. expectedSub1 = expectedSub2 = [1, 2, 3];
  554. Assert.Equal(expectedSub1, list1);
  555. Assert.Equal(expectedSub2, list2);
  556. Assert.Equal(1, connected);
  557. var list3 = new List<int>();
  558. source.Subscribe(list3.Add);
  559. // Since we were already connected, the 3rd subscriber just gets added to the observers of
  560. // the Publish multicast output, and no new connection should occur to the underlying ReplaySubject.
  561. // So for this 3rd subscription, no new items should be received by any of the subscribers
  562. List<int> expectedSub3 = [];
  563. Assert.Equal(expectedSub1, list1);
  564. Assert.Equal(expectedSub2, list2);
  565. Assert.Equal(expectedSub3, list3);
  566. Assert.Equal(1, connected);
  567. subject.OnNext(4);
  568. // All the current subscribers should have received that latest item.
  569. expectedSub1 = expectedSub2 = [1, 2, 3, 4];
  570. expectedSub3 = [4];
  571. Assert.Equal(expectedSub1, list1);
  572. Assert.Equal(expectedSub2, list2);
  573. Assert.Equal(expectedSub3, list3);
  574. Assert.Equal(1, connected);
  575. sub1.Dispose();
  576. subject.OnNext(5);
  577. // The two remaining subscribers should have received that new item, but the one that just
  578. // unsubscribed should not.
  579. expectedSub1 = [1, 2, 3, 4];
  580. expectedSub2 = [1, 2, 3, 4, 5];
  581. expectedSub3 = [4, 5];
  582. Assert.Equal(expectedSub1, list1);
  583. Assert.Equal(expectedSub2, list2);
  584. Assert.Equal(expectedSub3, list3);
  585. Assert.Equal(1, connected);
  586. sub2.Dispose();
  587. subject.OnNext(6);
  588. // We are now below the minObservers threshold of 2, but that threshold only governs when we move
  589. // from a disconnected state to a connected state. We should remain connected as long as there is
  590. // at least one subscriber, so we expect the remaining subscriber to receive that last item.
  591. expectedSub1 = [1, 2, 3, 4];
  592. expectedSub2 = [1, 2, 3, 4, 5];
  593. expectedSub3 = [4, 5, 6];
  594. Assert.Equal(expectedSub1, list1);
  595. Assert.Equal(expectedSub2, list2);
  596. Assert.Equal(expectedSub3, list3);
  597. Assert.Equal(1, connected);
  598. }
  599. [TestMethod]
  600. public void RefCount_NoDelay_SubscriptionsDropBelowThresholdAndThenBackAbove()
  601. {
  602. var sourceAfterInitial = new Subject<int>();
  603. var connected = 0;
  604. var source = Observable.Defer(() =>
  605. {
  606. connected++;
  607. return Observable.Range(1, 5).Concat(sourceAfterInitial);
  608. })
  609. .Publish()
  610. .RefCount(2);
  611. Assert.Equal(0, connected);
  612. var list1 = new List<int>();
  613. var sub1 = source.Subscribe(list1.Add);
  614. Assert.Equal(0, connected);
  615. Assert.Empty(list1);
  616. var list2 = new List<int>();
  617. var sub2 = source.Subscribe(list2.Add);
  618. Assert.Equal(1, connected);
  619. sourceAfterInitial.OnNext(6);
  620. sub1.Dispose();
  621. sourceAfterInitial.OnNext(7);
  622. Assert.Equal(1, connected);
  623. var list3 = new List<int>();
  624. var sub3 = source.Subscribe(list3.Add);
  625. // This is the distinguishing feature of this test. With that last subscription, we went from 1
  626. // subscriber (below minObservers) but still connected (because we already hit minObservers once
  627. // and never dropped to zero), and now we're passing through minObservers again. We used to have
  628. // a bug where we would erroneously attempt to reconnect at this point.
  629. Assert.Equal(1, connected);
  630. sourceAfterInitial.OnNext(8);
  631. var expectedSub1 = new List<int>([1, 2, 3, 4, 5, 6]);
  632. var expectedSub2 = new List<int>([1, 2, 3, 4, 5, 6, 7, 8]);
  633. var expectedSub3 = new List<int>([8]);
  634. Assert.Equal(expectedSub1, list1);
  635. Assert.Equal(expectedSub2, list2);
  636. Assert.Equal(expectedSub3, list3);
  637. }
  638. [TestMethod]
  639. public void RefCount_NoDelay_ValuesDuringAndAfterSubscribe()
  640. {
  641. var subject = new ReplaySubject<int>(5);
  642. var source = subject.Publish().RefCount();
  643. subject.OnNext(1);
  644. // Although the source is a ReplaySubject, the use of Publish means there will only be
  645. // a single subscription to the ReplaySubject, so it will only replay one. (It will replay
  646. // that first value on the initial connect.) So we expect each subscriber to see fewer and
  647. // fewer values.
  648. // all subscribers will see all the values
  649. List<int> expected1 = [1];
  650. var list1 = new List<int>();
  651. source.Subscribe(list1.Add);
  652. Assert.Equal(expected1, list1);
  653. subject.OnNext(2);
  654. var list2 = new List<int>();
  655. source.Subscribe(list2.Add);
  656. expected1 = [1, 2];
  657. List<int> expected2 = [];
  658. Assert.Equal(expected1, list1);
  659. Assert.Equal(expected2, list2);
  660. subject.OnNext(3);
  661. var list3 = new List<int>();
  662. source.Subscribe(list3.Add);
  663. expected1 = [1, 2, 3];
  664. expected2 = [3];
  665. List<int> expected3 = [];
  666. Assert.Equal(expected1, list1);
  667. Assert.Equal(expected2, list2);
  668. Assert.Equal(expected3, list3);
  669. subject.OnNext(4);
  670. expected1 = [1, 2, 3, 4];
  671. expected2 = [3, 4];
  672. expected3 = [4];
  673. Assert.Equal(expected1, list1);
  674. Assert.Equal(expected2, list2);
  675. Assert.Equal(expected3, list3);
  676. }
  677. [TestMethod]
  678. public void RefCount_NoDelay_minObservers_ValuesDuringAndAfterSubscribe()
  679. {
  680. var subject = new ReplaySubject<int>(5);
  681. var source = subject.Publish().RefCount(2);
  682. subject.OnNext(1);
  683. var list1 = new List<int>();
  684. source.Subscribe(list1.Add);
  685. Assert.Empty(list1);
  686. subject.OnNext(2);
  687. List<int> expected1and2 = [1, 2];
  688. var list2 = new List<int>();
  689. source.Subscribe(list2.Add);
  690. Assert.Equal(expected1and2, list1);
  691. Assert.Equal(expected1and2, list2);
  692. subject.OnNext(3);
  693. expected1and2 = [1, 2, 3];
  694. Assert.Equal(expected1and2, list1);
  695. Assert.Equal(expected1and2, list2);
  696. var list3 = new List<int>();
  697. source.Subscribe(list3.Add);
  698. List<int> expected3 = [];
  699. Assert.Equal(expected1and2, list1);
  700. Assert.Equal(expected1and2, list2);
  701. Assert.Equal(expected3, list3);
  702. subject.OnNext(4);
  703. expected1and2 = [1, 2, 3, 4];
  704. expected3 = [4];
  705. Assert.Equal(expected1and2, list1);
  706. Assert.Equal(expected1and2, list2);
  707. Assert.Equal(expected3, list3);
  708. }
  709. [TestMethod]
  710. public void RefCount_NoDelay_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletion()
  711. {
  712. var seen = 0;
  713. var terminated = false;
  714. // On initial subscription, the source will produce one value and will not complete.
  715. var connectable = new SerialSingleNotificationConnectable<int>(Notification.CreateOnNext(36));
  716. var refCount = connectable.RefCount();
  717. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  718. {
  719. Assert.Equal(36, seen);
  720. }
  721. seen = 0;
  722. terminated = false;
  723. // This time around, the source will complete when subscribed to.
  724. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted<int>());
  725. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  726. {
  727. Assert.Equal(0, seen);
  728. Assert.True(terminated);
  729. }
  730. seen = 0;
  731. terminated = false;
  732. // Now we go back to the initial behaviour in which the source produces one value and does not complete.
  733. connectable.SetNotificationForNextConnect(Notification.CreateOnNext(42));
  734. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  735. {
  736. Assert.Equal(42, seen);
  737. Assert.False(terminated);
  738. }
  739. }
  740. [TestMethod]
  741. public void RefCount_NoDelay_minObservers_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletion()
  742. {
  743. var seen1 = 0;
  744. var seen2 = 0;
  745. var terminated1 = false;
  746. var terminated2 = false;
  747. // On initial subscription, the source will produce one value and will not complete.
  748. var connectable = new SerialSingleNotificationConnectable<int>(Notification.CreateOnNext(36));
  749. var refCount = connectable.RefCount(2);
  750. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  751. {
  752. Assert.Equal(0, seen1);
  753. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  754. {
  755. Assert.Equal(36, seen1);
  756. Assert.Equal(36, seen2);
  757. }
  758. }
  759. seen1 = seen2 = 0;
  760. terminated1 = terminated2 = false;
  761. // This time around, the source will complete when subscribed to.
  762. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted<int>());
  763. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  764. {
  765. Assert.False(terminated1);
  766. Assert.False(terminated2);
  767. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  768. {
  769. Assert.Equal(0, seen1);
  770. Assert.Equal(0, seen2);
  771. Assert.True(terminated1);
  772. Assert.True(terminated2);
  773. }
  774. }
  775. seen1 = seen2 = 0;
  776. terminated1 = terminated2 = false;
  777. // Now we go back to the initial behaviour in which the source produces one value and does not complete.
  778. connectable.SetNotificationForNextConnect(Notification.CreateOnNext(42));
  779. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  780. {
  781. Assert.Equal(0, seen1);
  782. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  783. {
  784. Assert.Equal(42, seen1);
  785. Assert.Equal(42, seen2);
  786. Assert.False(terminated1);
  787. Assert.False(terminated2);
  788. }
  789. }
  790. }
  791. #endregion
  792. #region Delayed Disconnect
  793. [TestMethod]
  794. public void RefCount_DelayedDisconnect_ArgumentChecking()
  795. {
  796. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, TimeSpan.FromSeconds(2)));
  797. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, TimeSpan.FromSeconds(2), Scheduler.Default));
  798. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, 2, TimeSpan.FromSeconds(2)));
  799. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, 2, TimeSpan.FromSeconds(2)));
  800. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount(Observable.Never<int>().Publish(), TimeSpan.FromSeconds(2), null));
  801. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 0, TimeSpan.FromSeconds(2)));
  802. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -1, TimeSpan.FromSeconds(2)));
  803. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 2, TimeSpan.FromSeconds(2), null));
  804. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 0, TimeSpan.FromSeconds(2), Scheduler.Default));
  805. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -1, TimeSpan.FromSeconds(2), Scheduler.Default));
  806. }
  807. [TestMethod]
  808. public void RefCount_DelayedDisconnect_ConnectsOnFirst()
  809. {
  810. var scheduler = new TestScheduler();
  811. var xs = scheduler.CreateHotObservable(
  812. OnNext(210, 1),
  813. OnNext(220, 2),
  814. OnNext(230, 3),
  815. OnNext(240, 4),
  816. OnCompleted<int>(250)
  817. );
  818. var subject = new MySubject();
  819. var conn = new ConnectableObservable<int>(xs, subject);
  820. var res = scheduler.Start(() =>
  821. conn.RefCount(TimeSpan.FromSeconds(2))
  822. );
  823. res.Messages.AssertEqual(
  824. OnNext(210, 1),
  825. OnNext(220, 2),
  826. OnNext(230, 3),
  827. OnNext(240, 4),
  828. OnCompleted<int>(250)
  829. );
  830. Assert.True(subject.Disposed);
  831. }
  832. [TestMethod]
  833. public void RefCount_DelayedDisconnect_minObservers_ConnectsOnObserverThresholdReached()
  834. {
  835. var scheduler = new TestScheduler();
  836. var xs = scheduler.CreateHotObservable(
  837. OnNext(210, 1),
  838. OnNext(220, 2),
  839. OnNext(230, 3),
  840. OnNext(240, 4),
  841. OnCompleted<int>(250)
  842. );
  843. var subject = new MySubject();
  844. var conn = new ConnectableObservable<int>(xs, subject);
  845. var res = conn.RefCount(2, TimeSpan.FromTicks(300));
  846. var d1 = default(IDisposable);
  847. var o1 = scheduler.CreateObserver<int>();
  848. scheduler.ScheduleAbsolute(210, () => { d1 = res.Subscribe(o1); });
  849. var d2 = default(IDisposable);
  850. var o2 = scheduler.CreateObserver<int>();
  851. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  852. scheduler.Start();
  853. o1.Messages.AssertEqual(
  854. OnNext(230, 3),
  855. OnNext(240, 4),
  856. OnCompleted<int>(250)
  857. );
  858. Assert.True(subject.Disposed);
  859. }
  860. [TestMethod]
  861. public void RefCount_DelayedDisconnect_minObservers_SourceProducesValuesAndCompletesInSubscribe()
  862. {
  863. var connected = 0;
  864. var source = Observable.Defer(() =>
  865. {
  866. connected++;
  867. return Observable.Range(1, 5);
  868. })
  869. .Publish()
  870. .RefCount(2, TimeSpan.FromMinutes(1));
  871. Assert.Equal(0, connected);
  872. var list1 = new List<int>();
  873. source.Subscribe(list1.Add);
  874. Assert.Equal(0, connected);
  875. Assert.Empty(list1);
  876. var list2 = new List<int>();
  877. source.Subscribe(list2.Add);
  878. Assert.Equal(1, connected);
  879. var expected = new List<int>([1, 2, 3, 4, 5]);
  880. Assert.Equal(expected, list1);
  881. Assert.Equal(expected, list2);
  882. }
  883. [TestMethod]
  884. public void RefCount_DelayedDisconnect_SourceCompletesWithNoValuesInSubscribe()
  885. {
  886. var subscribed = 0;
  887. var unsubscribed = 0;
  888. var o1 = Observable.Create<string>(observer =>
  889. {
  890. subscribed++;
  891. observer.OnCompleted();
  892. return Disposable.Create(() => unsubscribed++);
  893. });
  894. var o2 = o1.Publish().RefCount(TimeSpan.FromSeconds(20));
  895. var s1 = o2.Subscribe();
  896. Assert.Equal(1, subscribed);
  897. Assert.Equal(1, unsubscribed);
  898. var s2 = o2.Subscribe();
  899. Assert.Equal(1, subscribed);
  900. Assert.Equal(1, unsubscribed);
  901. }
  902. [TestMethod]
  903. public void RefCount_DelayedDisconnect_minObservers_SourceCompletesWithNoValuesInSubscribe()
  904. {
  905. var scheduler = new TestScheduler();
  906. var subscribed = 0;
  907. var unsubscribed = 0;
  908. var o1 = Observable.Create<string>(observer =>
  909. {
  910. subscribed++;
  911. observer.OnCompleted();
  912. return Disposable.Create(() => unsubscribed++);
  913. });
  914. var o2 = o1.Publish().RefCount(2, TimeSpan.FromTicks(10), scheduler);
  915. var s1 = o2.Subscribe();
  916. Assert.Equal(0, subscribed);
  917. Assert.Equal(0, unsubscribed);
  918. // Note that although we've got a delayed disconnect, we don't need to call AdvanceBy
  919. // here because the source itself completes. The disconnect is triggered by the source,
  920. // not the RefCount in this test.
  921. var s2 = o2.Subscribe();
  922. Assert.Equal(1, subscribed);
  923. Assert.Equal(1, unsubscribed);
  924. s1.Dispose();
  925. s2.Dispose();
  926. // At this point, the RefCount has 0 subscribers, and will have disconnected from
  927. // its source. When we add a new subscriber, the count will be at 0, which is below
  928. // minObservers, so we don't expect a new connection. RefCount _will_ call Subscribe
  929. // on its source, but that source is the Subject created by Publish(). And since
  930. // o1 already delivered an OnComplete, that Subject is now in a completed state, so
  931. // it will immediately complete any further subscriptions. RefCount sees this, so
  932. // although the connection count briefly goes up to 1, it will then go back down to
  933. // 0 before this call to Subscribe returns.
  934. // Basically, because this test uses o1.Publish(), once our connectable source source
  935. // completes is it incapable of restarting. That's why we have other tests that use
  936. // SerialSingleNotificationConnectable - that enables us to build a source that resets
  937. var s3 = o2.Subscribe();
  938. Assert.Equal(1, subscribed);
  939. Assert.Equal(1, unsubscribed);
  940. // While it might look like adding a second subscriber should tip us back over the threshold
  941. // and trigger a reconnect, for the reasons described above o2 immediately completed in the
  942. // last call to subscribe, so the RefCount is zero at this point. This is a limitation of
  943. // Publish(). It doesn't really matter for this test, but it's why some tests use
  944. // SerialSingleNotificationConnectable.
  945. var s4 = o2.Subscribe();
  946. Assert.Equal(1, subscribed);
  947. Assert.Equal(1, unsubscribed);
  948. }
  949. [TestMethod]
  950. public void RefCount_DelayedDisconnect_NotConnected()
  951. {
  952. var scheduler = new TestScheduler();
  953. var disconnected = false;
  954. var count = 0;
  955. var xs = Observable.Defer(() =>
  956. {
  957. count++;
  958. return Observable.Create<int>(obs =>
  959. {
  960. return () => { disconnected = true; };
  961. });
  962. });
  963. var subject = new MySubject();
  964. var conn = new ConnectableObservable<int>(xs, subject);
  965. var refd = conn.RefCount(TimeSpan.FromTicks(20), scheduler);
  966. var dis1 = refd.Subscribe();
  967. Assert.Equal(1, count);
  968. Assert.Equal(1, subject.SubscribeCount);
  969. Assert.False(disconnected);
  970. var dis2 = refd.Subscribe();
  971. Assert.Equal(1, count);
  972. Assert.Equal(2, subject.SubscribeCount);
  973. Assert.False(disconnected);
  974. dis1.Dispose();
  975. Assert.False(disconnected);
  976. dis2.Dispose();
  977. Assert.False(disconnected);
  978. scheduler.AdvanceBy(19);
  979. Assert.False(disconnected);
  980. scheduler.AdvanceBy(1);
  981. Assert.True(disconnected);
  982. disconnected = false;
  983. var dis3 = refd.Subscribe();
  984. Assert.Equal(2, count);
  985. Assert.Equal(3, subject.SubscribeCount);
  986. Assert.False(disconnected);
  987. dis3.Dispose();
  988. scheduler.AdvanceBy(20);
  989. Assert.True(disconnected);
  990. }
  991. [TestMethod]
  992. public void RefCount_DelayedDisconnect_minObservers_NotConnected()
  993. {
  994. var connected = 0;
  995. var source = Observable.Defer(() =>
  996. {
  997. connected++;
  998. return Observable.Never<int>();
  999. })
  1000. .Publish()
  1001. .RefCount(2, TimeSpan.FromMinutes(1));
  1002. Assert.Equal(0, connected);
  1003. source.Subscribe();
  1004. Assert.Equal(0, connected);
  1005. }
  1006. [TestMethod]
  1007. public void RefCount_DelayedDisconnect_OnError()
  1008. {
  1009. var ex = new Exception();
  1010. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  1011. var res = xs.Publish().RefCount(TimeSpan.FromSeconds(2));
  1012. res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
  1013. res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
  1014. }
  1015. [TestMethod]
  1016. public void RefCount_DelayedDisconnect_minObservers_OnError()
  1017. {
  1018. var ex = new Exception();
  1019. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  1020. var res = xs.Publish().RefCount(2, TimeSpan.FromSeconds(200));
  1021. var exceptionsReceived = new List<Exception>();
  1022. void AddSubscriber()
  1023. {
  1024. res.Subscribe(
  1025. _ => { Assert.Fail("OnNext unexpected"); },
  1026. ex_ => { exceptionsReceived.Add(ex); },
  1027. () => { Assert.Fail("OnComplete unexpected"); });
  1028. }
  1029. AddSubscriber();
  1030. Assert.Equal(0, exceptionsReceived.Count);
  1031. AddSubscriber();
  1032. Assert.Equal(2, exceptionsReceived.Count);
  1033. Assert.Same(ex, exceptionsReceived[0]);
  1034. Assert.Same(ex, exceptionsReceived[1]);
  1035. }
  1036. [TestMethod]
  1037. public void RefCount_DelayedDisconnect_HotSourceMultipleSubscribers()
  1038. {
  1039. var scheduler = new TestScheduler();
  1040. var xs = scheduler.CreateHotObservable(
  1041. OnNext(210, 1),
  1042. OnNext(220, 2),
  1043. OnNext(230, 3),
  1044. OnNext(240, 4),
  1045. OnNext(250, 5),
  1046. OnNext(260, 6),
  1047. OnNext(270, 7),
  1048. OnNext(280, 8),
  1049. OnNext(290, 9),
  1050. OnCompleted<int>(300)
  1051. );
  1052. var res = xs.Publish().RefCount(TimeSpan.FromTicks(9), scheduler);
  1053. var d1 = default(IDisposable);
  1054. var o1 = scheduler.CreateObserver<int>();
  1055. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  1056. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  1057. var d2 = default(IDisposable);
  1058. var o2 = scheduler.CreateObserver<int>();
  1059. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  1060. scheduler.ScheduleAbsolute(275, () =>
  1061. {
  1062. d2.Dispose();
  1063. });
  1064. var d3 = default(IDisposable);
  1065. var o3 = scheduler.CreateObserver<int>();
  1066. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  1067. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  1068. var d4 = default(IDisposable);
  1069. var o4 = scheduler.CreateObserver<int>();
  1070. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  1071. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  1072. scheduler.Start();
  1073. o1.Messages.AssertEqual(
  1074. OnNext(220, 2),
  1075. OnNext(230, 3)
  1076. );
  1077. o2.Messages.AssertEqual(
  1078. OnNext(230, 3),
  1079. OnNext(240, 4),
  1080. OnNext(250, 5),
  1081. OnNext(260, 6),
  1082. OnNext(270, 7)
  1083. );
  1084. o3.Messages.AssertEqual(
  1085. OnNext(260, 6)
  1086. );
  1087. o4.Messages.AssertEqual(
  1088. OnNext(290, 9),
  1089. OnCompleted<int>(300)
  1090. );
  1091. xs.Subscriptions.AssertEqual(
  1092. Subscribe(215, 284),
  1093. Subscribe(285, 300)
  1094. );
  1095. }
  1096. [TestMethod]
  1097. public void RefCount_DelayedDisconnect_minObservers_HotSourceMultipleSubscribers()
  1098. {
  1099. var scheduler = new TestScheduler();
  1100. var xs = scheduler.CreateHotObservable(
  1101. OnNext(210, 1), // 0 subscribers
  1102. OnNext(220, 2), // 1 subscriber
  1103. OnNext(230, 3), // 2 subscribers
  1104. OnNext(240, 4), // 1 subscriber
  1105. OnNext(250, 5), // 1 subscriber
  1106. OnNext(260, 6), // 2 subscribers
  1107. OnNext(270, 7), // 1 subscribers
  1108. OnNext(280, 8), // 0 subscribers
  1109. OnNext(290, 9), // 1 subscribers
  1110. OnNext(300, 10), // 2 subscribers
  1111. OnCompleted<int>(310)
  1112. );
  1113. var res = xs.Publish().RefCount(2, TimeSpan.FromTicks(9), scheduler);
  1114. var d1 = default(IDisposable);
  1115. var o1 = scheduler.CreateObserver<int>();
  1116. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  1117. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  1118. var d2 = default(IDisposable);
  1119. var o2 = scheduler.CreateObserver<int>();
  1120. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  1121. scheduler.ScheduleAbsolute(275, () =>
  1122. {
  1123. d2.Dispose();
  1124. });
  1125. var d3 = default(IDisposable);
  1126. var o3 = scheduler.CreateObserver<int>();
  1127. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  1128. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  1129. var d4 = default(IDisposable);
  1130. var o4 = scheduler.CreateObserver<int>();
  1131. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  1132. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  1133. var d5 = default(IDisposable);
  1134. var o5 = scheduler.CreateObserver<int>();
  1135. scheduler.ScheduleAbsolute(295, () => { d5 = res.Subscribe(o5); });
  1136. scheduler.ScheduleAbsolute(320, () => { d5.Dispose(); });
  1137. scheduler.Start();
  1138. o1.Messages.AssertEqual(
  1139. OnNext(230, 3)
  1140. );
  1141. o2.Messages.AssertEqual(
  1142. OnNext(230, 3),
  1143. OnNext(240, 4),
  1144. OnNext(250, 5),
  1145. OnNext(260, 6),
  1146. OnNext(270, 7)
  1147. );
  1148. o3.Messages.AssertEqual(
  1149. OnNext(260, 6)
  1150. );
  1151. o4.Messages.AssertEqual(
  1152. OnNext(300, 10),
  1153. OnCompleted<int>(310)
  1154. );
  1155. o5.Messages.AssertEqual(
  1156. OnNext(300, 10),
  1157. OnCompleted<int>(310)
  1158. );
  1159. xs.Subscriptions.AssertEqual(
  1160. Subscribe(225, 284),
  1161. Subscribe(295, 310)
  1162. );
  1163. }
  1164. [TestMethod]
  1165. public void RefCount_DelayedDisconnect_minObservers_SubscriptionsDropBelowThresholdButNotToZero()
  1166. {
  1167. var subject = new ReplaySubject<int>(5);
  1168. var connected = 0;
  1169. var source = Observable.Defer(() =>
  1170. {
  1171. connected++;
  1172. return subject;
  1173. })
  1174. .Publish()
  1175. .RefCount(2, TimeSpan.FromMinutes(1));
  1176. subject.OnNext(1);
  1177. Assert.Equal(0, connected);
  1178. var list1 = new List<int>();
  1179. var sub1 = source.Subscribe(list1.Add);
  1180. Assert.Equal(0, connected);
  1181. Assert.Empty(list1);
  1182. subject.OnNext(2);
  1183. var list2 = new List<int>();
  1184. var sub2 = source.Subscribe(list2.Add);
  1185. // Since connection only occurred with the 2nd subscriber, we expect both to get everything
  1186. // the ReplaySubject has stored.
  1187. List<int> expectedSub1 = [1, 2];
  1188. var expectedSub2 = expectedSub1;
  1189. Assert.Equal(expectedSub1, list1);
  1190. Assert.Equal(expectedSub1, list2);
  1191. Assert.Equal(1, connected);
  1192. subject.OnNext(3);
  1193. // Both subscribers should have received the new item.
  1194. expectedSub1 = expectedSub2 = [1, 2, 3];
  1195. Assert.Equal(expectedSub1, list1);
  1196. Assert.Equal(expectedSub2, list2);
  1197. Assert.Equal(1, connected);
  1198. var list3 = new List<int>();
  1199. source.Subscribe(list3.Add);
  1200. // Since we were already connected, the 3rd subscriber just gets added to the observers of
  1201. // the Publish multicast output, and no new connection should occur to the underlying ReplaySubject.
  1202. // So for this 3rd subscription, no new items should be received by any of the subscribers
  1203. List<int> expectedSub3 = [];
  1204. Assert.Equal(expectedSub1, list1);
  1205. Assert.Equal(expectedSub2, list2);
  1206. Assert.Equal(expectedSub3, list3);
  1207. Assert.Equal(1, connected);
  1208. subject.OnNext(4);
  1209. // All the current subscribers should have received that latest item.
  1210. expectedSub1 = expectedSub2 = [1, 2, 3, 4];
  1211. expectedSub3 = [4];
  1212. Assert.Equal(expectedSub1, list1);
  1213. Assert.Equal(expectedSub2, list2);
  1214. Assert.Equal(expectedSub3, list3);
  1215. Assert.Equal(1, connected);
  1216. sub1.Dispose();
  1217. subject.OnNext(5);
  1218. // The two remaining subscribers should have received that new item, but the one that just
  1219. // unsubscribed should not.
  1220. expectedSub1 = [1, 2, 3, 4];
  1221. expectedSub2 = [1, 2, 3, 4, 5];
  1222. expectedSub3 = [4, 5];
  1223. Assert.Equal(expectedSub1, list1);
  1224. Assert.Equal(expectedSub2, list2);
  1225. Assert.Equal(expectedSub3, list3);
  1226. Assert.Equal(1, connected);
  1227. sub2.Dispose();
  1228. subject.OnNext(6);
  1229. // We are now below the minObservers threshold of 2, but that threshold only governs when we move
  1230. // from a disconnected state to a connected state. We should remain connected as long as there is
  1231. // at least one subscriber, so we expect the remaining subscriber to receive that last item.
  1232. expectedSub1 = [1, 2, 3, 4];
  1233. expectedSub2 = [1, 2, 3, 4, 5];
  1234. expectedSub3 = [4, 5, 6];
  1235. Assert.Equal(expectedSub1, list1);
  1236. Assert.Equal(expectedSub2, list2);
  1237. Assert.Equal(expectedSub3, list3);
  1238. Assert.Equal(1, connected);
  1239. }
  1240. [TestMethod]
  1241. public void RefCount_DelayedDisconnect_SubscriptionsDropBelowThresholdAndThenBackAbove()
  1242. {
  1243. var scheduler = new TestScheduler();
  1244. var sourceAfterInitial = new Subject<int>();
  1245. var connected = 0;
  1246. var source = Observable.Defer(() =>
  1247. {
  1248. connected++;
  1249. return Observable.Range(1, 5).Concat(sourceAfterInitial);
  1250. })
  1251. .Publish()
  1252. .RefCount(2, TimeSpan.FromTicks(10), scheduler);
  1253. Assert.Equal(0, connected);
  1254. var list1 = new List<int>();
  1255. var sub1 = source.Subscribe(list1.Add); // 1 subscriber
  1256. Assert.Equal(0, connected);
  1257. Assert.Empty(list1);
  1258. var list2 = new List<int>();
  1259. var sub2 = source.Subscribe(list2.Add); // 2 subscribers
  1260. Assert.Equal(1, connected);
  1261. sourceAfterInitial.OnNext(6);
  1262. sub1.Dispose(); // 1 subscriber
  1263. // We don't expect a disconnect, but provide enough time for one to occur, should that bug ever creep in
  1264. scheduler.AdvanceBy(10);
  1265. Assert.Equal(1, connected);
  1266. sourceAfterInitial.OnNext(7);
  1267. Assert.Equal(1, connected);
  1268. var list3 = new List<int>();
  1269. var sub3 = source.Subscribe(list3.Add);
  1270. // This is the distinguishing feature of this test. With that last subscription, we went from 1
  1271. // subscriber (below minObservers) but still connected (because we already hit minObservers once
  1272. // and never dropped to zero), and now we're passing through minObservers again. We used to have
  1273. // a bug where we would erroneously attempt to reconnect at this point.
  1274. Assert.Equal(1, connected);
  1275. sourceAfterInitial.OnNext(8);
  1276. var expectedSub1 = new List<int>([1, 2, 3, 4, 5, 6]);
  1277. var expectedSub2 = new List<int>([1, 2, 3, 4, 5, 6, 7, 8]);
  1278. var expectedSub3 = new List<int>([8]);
  1279. Assert.Equal(expectedSub1, list1);
  1280. Assert.Equal(expectedSub2, list2);
  1281. Assert.Equal(expectedSub3, list3);
  1282. }
  1283. [TestMethod]
  1284. public void RefCount_DelayedDisconnect_SubscriptionsDropToZeroThenNewSubscriptionArrivesBeforeDisconnectDelay()
  1285. {
  1286. var scheduler = new TestScheduler();
  1287. var source = new SerialSingleNotificationConnectable<int>(Notification.CreateOnNext(1));
  1288. var rco = source.RefCount(TimeSpan.FromTicks(10), scheduler);
  1289. var s1 = rco.Subscribe();
  1290. s1.Dispose();
  1291. // There are now 0 subscribers, but the time for the disconnect has not yet come.
  1292. Assert.Equal(1, source.Connections.Count);
  1293. Assert.False(source.Connections[0].Disposed);
  1294. scheduler.AdvanceBy(9);
  1295. // The time has still not come,
  1296. Assert.Equal(1, source.Connections.Count);
  1297. Assert.False(source.Connections[0].Disposed);
  1298. // Since we were still connected, this should move the connection from a 'waiting to
  1299. // shut down' state into an active state.
  1300. var seen = 0;
  1301. var terminated = false;
  1302. var s2 = rco.Subscribe(x => seen = x, () => terminated = true);
  1303. source.DeliverNotificationForActiveConnection(Notification.CreateOnNext(2));
  1304. Assert.Equal(2, seen);
  1305. Assert.False(terminated);
  1306. Assert.False(source.Connections[0].Disposed);
  1307. // This moves us past the time when `RefCount` would have shut down the connection if no new
  1308. // subscriptions had turned up.
  1309. scheduler.AdvanceBy(2);
  1310. Assert.False(terminated);
  1311. Assert.False(source.Connections[0].Disposed);
  1312. // We should be able to advance well beyond the disconnect delay because we have an active
  1313. // subscriber.
  1314. scheduler.AdvanceBy(20);
  1315. Assert.False(terminated);
  1316. Assert.False(source.Connections[0].Disposed);
  1317. }
  1318. [TestMethod]
  1319. public void RefCount_DelayedDisconnect_minObservers_SubscriptionsDropToZeroThenNewSubscriptionArrivesBeforeDisconnectDelay()
  1320. {
  1321. var scheduler = new TestScheduler();
  1322. var source = new SerialSingleNotificationConnectable<int>(Notification.CreateOnNext(1));
  1323. var rco = source.RefCount(2, TimeSpan.FromTicks(10), scheduler);
  1324. var s1 = rco.Subscribe();
  1325. var s2 = rco.Subscribe();
  1326. s1.Dispose();
  1327. s2.Dispose();
  1328. // There are now 0 subscribers, but the time for the disconnect has not yet come.
  1329. Assert.Equal(1, source.Connections.Count);
  1330. Assert.False(source.Connections[0].Disposed);
  1331. scheduler.AdvanceBy(9);
  1332. // The time has still not come,
  1333. Assert.Equal(1, source.Connections.Count);
  1334. Assert.False(source.Connections[0].Disposed);
  1335. // Since we were still connected, this should move the connection from a 'waiting to
  1336. // shut down' state into an active state. (We're below the minObservers threshold, but
  1337. // that just determines when Connect is called. RefCount has historically always waited
  1338. // for the subscription count to reach 0 before disconnecting, so if that count goes
  1339. // above 0 while we were waiting for the disconnect delay, it should return to an
  1340. // active state.)
  1341. var seen = 0;
  1342. var terminated = false;
  1343. var s3 = rco.Subscribe(x => seen = x, () => terminated = true);
  1344. source.DeliverNotificationForActiveConnection(Notification.CreateOnNext(2));
  1345. Assert.Equal(2, seen);
  1346. Assert.False(terminated);
  1347. Assert.False(source.Connections[0].Disposed);
  1348. // This moves us past the time when `RefCount` would have shut down the connection if
  1349. // no new subscriptions had turned up. The arrival of a new subscriber should ensure
  1350. // that we remain connected.
  1351. scheduler.AdvanceBy(2);
  1352. Assert.False(terminated);
  1353. Assert.False(source.Connections[0].Disposed);
  1354. // We should be able to advance well beyond the disconnect delay because we have an active
  1355. // subscriber.
  1356. scheduler.AdvanceBy(20);
  1357. Assert.False(terminated);
  1358. Assert.False(source.Connections[0].Disposed);
  1359. }
  1360. [TestMethod]
  1361. public void RefCount_DelayedDisconnect_ValuesDuringAndAfterSubscribe()
  1362. {
  1363. var subject = new ReplaySubject<int>(5);
  1364. var source = subject.Publish().RefCount(TimeSpan.FromSeconds(20));
  1365. subject.OnNext(1);
  1366. // Although the source is a ReplaySubject, the use of Publish means there will only be
  1367. // a single subscription to the ReplaySubject, so it will only replay one. (It will replay
  1368. // that first value on the initial connect.) So we expect each subscriber to see fewer and
  1369. // fewer values.
  1370. // all subscribers will see all the values
  1371. List<int> expected1 = [1];
  1372. var list1 = new List<int>();
  1373. source.Subscribe(list1.Add);
  1374. Assert.Equal(expected1, list1);
  1375. subject.OnNext(2);
  1376. var list2 = new List<int>();
  1377. source.Subscribe(list2.Add);
  1378. expected1 = [1, 2];
  1379. List<int> expected2 = [];
  1380. Assert.Equal(expected1, list1);
  1381. Assert.Equal(expected2, list2);
  1382. subject.OnNext(3);
  1383. var list3 = new List<int>();
  1384. source.Subscribe(list3.Add);
  1385. expected1 = [1, 2, 3];
  1386. expected2 = [3];
  1387. List<int> expected3 = [];
  1388. Assert.Equal(expected1, list1);
  1389. Assert.Equal(expected2, list2);
  1390. Assert.Equal(expected3, list3);
  1391. subject.OnNext(4);
  1392. expected1 = [1, 2, 3, 4];
  1393. expected2 = [3, 4];
  1394. expected3 = [4];
  1395. Assert.Equal(expected1, list1);
  1396. Assert.Equal(expected2, list2);
  1397. Assert.Equal(expected3, list3);
  1398. }
  1399. [TestMethod]
  1400. public void RefCount_DelayedDisconnect_minObservers_ValuesDuringAndAfterSubscribe()
  1401. {
  1402. var subject = new ReplaySubject<int>(5);
  1403. var source = subject.Publish().RefCount(2, TimeSpan.FromSeconds(20));
  1404. subject.OnNext(1);
  1405. var list1 = new List<int>();
  1406. source.Subscribe(list1.Add);
  1407. Assert.Empty(list1);
  1408. subject.OnNext(2);
  1409. List<int> expected1and2 = [1, 2];
  1410. var list2 = new List<int>();
  1411. source.Subscribe(list2.Add);
  1412. Assert.Equal(expected1and2, list1);
  1413. Assert.Equal(expected1and2, list2);
  1414. subject.OnNext(3);
  1415. expected1and2 = [1, 2, 3];
  1416. Assert.Equal(expected1and2, list1);
  1417. Assert.Equal(expected1and2, list2);
  1418. var list3 = new List<int>();
  1419. source.Subscribe(list3.Add);
  1420. List<int> expected3 = [];
  1421. Assert.Equal(expected1and2, list1);
  1422. Assert.Equal(expected1and2, list2);
  1423. Assert.Equal(expected3, list3);
  1424. subject.OnNext(4);
  1425. expected1and2 = [1, 2, 3, 4];
  1426. expected3 = [4];
  1427. Assert.Equal(expected1and2, list1);
  1428. Assert.Equal(expected1and2, list2);
  1429. Assert.Equal(expected3, list3);
  1430. }
  1431. [TestMethod]
  1432. [DataRow(true)]
  1433. [DataRow(false)]
  1434. public void RefCount_DelayedDisconnect_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletion(
  1435. bool reSubscribeBeforeDelayedDisconnect)
  1436. {
  1437. var scheduler = new TestScheduler();
  1438. var seen = 0;
  1439. var terminated = false;
  1440. // On initial subscription, the source will produce one value and will not complete.
  1441. var connectable = new SerialSingleNotificationConnectable<int>(Notification.CreateOnNext(36));
  1442. var refCount = connectable.RefCount(TimeSpan.FromTicks(10), scheduler);
  1443. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  1444. {
  1445. Assert.Equal(36, seen);
  1446. Assert.Equal(1, connectable.Connections.Count);
  1447. Assert.False(connectable.Connections[0].Disposed);
  1448. }
  1449. Assert.False(connectable.Connections[0].Disposed);
  1450. // For these initial subscriptions, we allow enough time for the delayed disconnect to occur even if
  1451. // reSubscribeBeforeDelayedDisconnect is false, because it's the resubscription after a source-induced
  1452. // completion that this test is interested in.
  1453. scheduler.AdvanceBy(11);
  1454. Assert.Equal(1, connectable.Connections.Count);
  1455. Assert.True(connectable.Connections[0].Disposed);
  1456. seen = 0;
  1457. terminated = false;
  1458. // This time around, when Connect is called, all subscriptions after the preceding Connect will be
  1459. // completed.
  1460. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted<int>());
  1461. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  1462. {
  1463. Assert.Equal(0, seen);
  1464. Assert.True(terminated);
  1465. Assert.Equal(2, connectable.Connections.Count);
  1466. Assert.False(connectable.Connections[1].Disposed);
  1467. }
  1468. Assert.Equal(2, connectable.Connections.Count);
  1469. Assert.False(connectable.Connections[1].Disposed);
  1470. scheduler.AdvanceBy(reSubscribeBeforeDelayedDisconnect ? 1 : 11);
  1471. Assert.Equal(2, connectable.Connections.Count);
  1472. Assert.Equal(!reSubscribeBeforeDelayedDisconnect, connectable.Connections[1].Disposed);
  1473. seen = 0;
  1474. terminated = false;
  1475. // Now we go back to the initial behaviour in which the source produces one value and does not complete.
  1476. connectable.SetNotificationForNextConnect(Notification.CreateOnNext(42));
  1477. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  1478. {
  1479. Assert.Equal(reSubscribeBeforeDelayedDisconnect ? 0 : 42, seen);
  1480. Assert.Equal(reSubscribeBeforeDelayedDisconnect, terminated);
  1481. Assert.Equal(reSubscribeBeforeDelayedDisconnect ? 2 : 3, connectable.Connections.Count);
  1482. Assert.False(connectable.Connections[reSubscribeBeforeDelayedDisconnect ? 1 : 2].Disposed);
  1483. }
  1484. }
  1485. [TestMethod]
  1486. public void RefCount_DelayedDisconnect_minObservers_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletionAndEnoughTimeForDisconnectHasPassed()
  1487. {
  1488. var scheduler = new TestScheduler();
  1489. var seen1 = 0;
  1490. var seen2 = 0;
  1491. var terminated1 = false;
  1492. var terminated2 = false;
  1493. // On initial subscription, the source will produce one value and will not complete.
  1494. var connectable = new SerialSingleNotificationConnectable<int>(Notification.CreateOnNext(36));
  1495. var refCount = connectable.RefCount(2, TimeSpan.FromTicks(10), scheduler);
  1496. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1497. {
  1498. Assert.Equal(0, seen1);
  1499. Assert.Empty(connectable.Connections);
  1500. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1501. {
  1502. Assert.Equal(36, seen1);
  1503. Assert.Equal(36, seen2);
  1504. Assert.Equal(1, connectable.Connections.Count);
  1505. Assert.False(connectable.Connections[0].Disposed);
  1506. }
  1507. }
  1508. Assert.Equal(1, connectable.Connections.Count);
  1509. Assert.False(connectable.Connections[0].Disposed);
  1510. scheduler.AdvanceBy(11);
  1511. Assert.Equal(1, connectable.Connections.Count);
  1512. Assert.True(connectable.Connections[0].Disposed);
  1513. seen1 = seen2 = 0;
  1514. terminated1 = terminated2 = false;
  1515. // This time around, when Connect is called, all subscriptions after the preceding Connect will be
  1516. // completed.
  1517. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted<int>());
  1518. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1519. {
  1520. Assert.Equal(1, connectable.Connections.Count);
  1521. Assert.False(terminated1);
  1522. Assert.False(terminated2);
  1523. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1524. {
  1525. Assert.Equal(0, seen1);
  1526. Assert.Equal(0, seen2);
  1527. Assert.True(terminated1);
  1528. Assert.True(terminated2);
  1529. Assert.Equal(2, connectable.Connections.Count);
  1530. Assert.False(connectable.Connections[1].Disposed);
  1531. }
  1532. }
  1533. Assert.Equal(2, connectable.Connections.Count);
  1534. Assert.False(connectable.Connections[1].Disposed);
  1535. scheduler.AdvanceBy(11);
  1536. Assert.Equal(2, connectable.Connections.Count);
  1537. Assert.True(connectable.Connections[1].Disposed);
  1538. seen1 = seen2 = 0;
  1539. terminated1 = terminated2 = false;
  1540. // Now we go back to the initial behaviour in which the source produces one value and does not complete.
  1541. connectable.SetNotificationForNextConnect(Notification.CreateOnNext(42));
  1542. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1543. {
  1544. Assert.False(terminated1);
  1545. Assert.Equal(0, seen1);
  1546. Assert.False(terminated2);
  1547. Assert.Equal(2, connectable.Connections.Count);
  1548. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1549. {
  1550. Assert.Equal(42, seen1);
  1551. Assert.Equal(42, seen2);
  1552. Assert.False(terminated1);
  1553. Assert.False(terminated2);
  1554. Assert.Equal(3, connectable.Connections.Count);
  1555. Assert.False(connectable.Connections[2].Disposed);
  1556. }
  1557. }
  1558. }
  1559. [TestMethod]
  1560. public void RefCount_DelayedDisconnect_minObservers_CanConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletionAndEnoughTimeForDisconnectHasPassed_WithPreConnectNotifications()
  1561. {
  1562. var scheduler = new TestScheduler();
  1563. var seen1 = 0;
  1564. var seen2 = 0;
  1565. var terminated1 = false;
  1566. var terminated2 = false;
  1567. // On initial subscription, the source will produce one value and will not complete.
  1568. var connectable = new SerialConnectableIgnoringConnect<int>(new BehaviorSubject<int>(36));
  1569. var refCount = connectable.RefCount(2, TimeSpan.FromTicks(10), scheduler);
  1570. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1571. {
  1572. // The SerialConnectableConnectIgnoringObservable is unusual in that it can produce values before the
  1573. // call to Connect. So we expect to see the value from the source, but not yet to
  1574. // have seen a Connect call.
  1575. Assert.Equal(36, seen1);
  1576. Assert.Empty(connectable.Connections);
  1577. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1578. {
  1579. Assert.Equal(36, seen1);
  1580. Assert.Equal(36, seen2);
  1581. Assert.Equal(1, connectable.Connections.Count);
  1582. Assert.False(connectable.Connections[0].Disposed);
  1583. }
  1584. }
  1585. Assert.Equal(1, connectable.Connections.Count);
  1586. Assert.False(connectable.Connections[0].Disposed);
  1587. scheduler.AdvanceBy(11);
  1588. Assert.Equal(1, connectable.Connections.Count);
  1589. Assert.True(connectable.Connections[0].Disposed);
  1590. seen1 = seen2 = 0;
  1591. terminated1 = terminated2 = false;
  1592. // This time around, the source will complete when subscribed to.
  1593. connectable.SetSource(Observable.Empty<int>());
  1594. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1595. {
  1596. // Again, the SerialConnectableConnectIgnoringObservable's unsual behaviour of
  1597. // delivering notifications immediately from subscription without waiting for the
  1598. // Connect means we see the initial termination immediately (and no connection yet).
  1599. Assert.True(terminated1);
  1600. Assert.False(terminated2);
  1601. Assert.Equal(1, connectable.Connections.Count);
  1602. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1603. {
  1604. Assert.Equal(0, seen1);
  1605. Assert.Equal(0, seen2);
  1606. Assert.True(terminated1);
  1607. Assert.True(terminated2);
  1608. // Since the initial subscription completed immediately, the observer count
  1609. // never got above 1, so we do not expect a second connection
  1610. Assert.Equal(1, connectable.Connections.Count);
  1611. Assert.True(connectable.Connections[0].Disposed);
  1612. }
  1613. }
  1614. Assert.Equal(1, connectable.Connections.Count);
  1615. scheduler.AdvanceBy(11);
  1616. Assert.Equal(1, connectable.Connections.Count);
  1617. seen1 = seen2 = 0;
  1618. terminated1 = terminated2 = false;
  1619. // Now we go back to the initial behaviour in which the source produces one value and does not complete.
  1620. connectable.SetSource(new BehaviorSubject<int>(42));
  1621. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1622. {
  1623. Assert.False(terminated1);
  1624. Assert.Equal(42, seen1);
  1625. Assert.False(terminated2);
  1626. Assert.Equal(1, connectable.Connections.Count);
  1627. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1628. {
  1629. Assert.Equal(42, seen1);
  1630. Assert.Equal(42, seen2);
  1631. Assert.False(terminated1);
  1632. Assert.False(terminated2);
  1633. Assert.Equal(2, connectable.Connections.Count);
  1634. Assert.False(connectable.Connections[1].Disposed);
  1635. }
  1636. }
  1637. }
  1638. [TestMethod]
  1639. public void RefCount_DelayedDisconnect_minObservers_DoesNotConnectAgainIfPreviousSubscriptionTerminatedFromSubscribeByCompletionButNotEnoughTimeForDelayedDisconnectHasPassed()
  1640. {
  1641. var scheduler = new TestScheduler();
  1642. var seen1 = 0;
  1643. var seen2 = 0;
  1644. var terminated1 = false;
  1645. var terminated2 = false;
  1646. // On initial subscription, the source will produce one value and will not complete.
  1647. var connectable = new SerialSingleNotificationConnectable<int>(Notification.CreateOnNext(36));
  1648. var refCount = connectable.RefCount(2, TimeSpan.FromTicks(10), scheduler);
  1649. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1650. {
  1651. Assert.Equal(0, seen1);
  1652. Assert.Empty(connectable.Connections);
  1653. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1654. {
  1655. Assert.Equal(36, seen1);
  1656. Assert.Equal(36, seen2);
  1657. Assert.Equal(1, connectable.Connections.Count);
  1658. Assert.False(connectable.Connections[0].Disposed);
  1659. }
  1660. }
  1661. Assert.Equal(1, connectable.Connections.Count);
  1662. Assert.False(connectable.Connections[0].Disposed);
  1663. // For these initial subscriptions, we allow enough time for the delayed disconnect to occur, because
  1664. // it's the resubscription after a source-induced completion that this test is interested in.
  1665. scheduler.AdvanceBy(11);
  1666. Assert.Equal(1, connectable.Connections.Count);
  1667. Assert.True(connectable.Connections[0].Disposed);
  1668. seen1 = seen2 = 0;
  1669. terminated1 = terminated2 = false;
  1670. // Any further subscriptions will be completed on the next Connect.
  1671. connectable.SetNotificationForNextConnect(Notification.CreateOnCompleted<int>());
  1672. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1673. {
  1674. Assert.Equal(1, connectable.Connections.Count);
  1675. Assert.False(terminated1);
  1676. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1677. {
  1678. Assert.Equal(0, seen1);
  1679. Assert.Equal(0, seen2);
  1680. Assert.True(terminated1);
  1681. Assert.True(terminated2);
  1682. Assert.Equal(2, connectable.Connections.Count);
  1683. Assert.False(connectable.Connections[1].Disposed);
  1684. }
  1685. }
  1686. Assert.Equal(2, connectable.Connections.Count);
  1687. Assert.False(connectable.Connections[1].Disposed);
  1688. scheduler.AdvanceBy(5);
  1689. Assert.Equal(2, connectable.Connections.Count);
  1690. Assert.False(connectable.Connections[1].Disposed);
  1691. seen1 = seen2 = 0;
  1692. terminated1 = terminated2 = false;
  1693. // To verify that individual subscriptions continue to be forwarded to the underlying source even
  1694. // when no reconnect occurs, we arrange for subsequent subscriptions to get receive a single value.
  1695. // (This is a slightly odd thing to do, but it's not RefCount's place to have opinions on how the
  1696. // source should behave.)
  1697. connectable.Connections[1].ReplaceSource(new BehaviorSubject<int>(42));
  1698. // The connection set up in the preceding section won't be torn down until the
  1699. // specified disconnect delay has elapsed, so the expected behaviour if we try to establish
  1700. // new subscriptions in that time is that their Subscribe will be passed through to the source,
  1701. // and that we won't see any further connections. But now that the further subscriptions to the
  1702. // source will result in a value (even though earlier subscriptions to the same source have been
  1703. // completed) we expect these new subscriptions each to see the value.
  1704. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1705. {
  1706. Assert.False(terminated1);
  1707. Assert.Equal(42, seen1);
  1708. Assert.False(terminated2);
  1709. Assert.Equal(2, connectable.Connections.Count);
  1710. Assert.False(connectable.Connections[1].Disposed);
  1711. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1712. {
  1713. Assert.Equal(42, seen1);
  1714. Assert.Equal(42, seen2);
  1715. Assert.False(terminated1);
  1716. Assert.False(terminated2);
  1717. Assert.Equal(2, connectable.Connections.Count);
  1718. Assert.False(connectable.Connections[1].Disposed);
  1719. }
  1720. }
  1721. connectable.SetNotificationForNextConnect(Notification.CreateOnNext(99));
  1722. // If we advanced by enough for the deferred disconnect to occur, it should be able to create a fresh
  1723. // connection to the underlying source, at which point we'll see the value again.
  1724. // We were at 5, so this takes us to 11 since the initial connection, but we don't expect that to be
  1725. // enough, because the deferred disconnection should be relative to the most recent subscription.
  1726. scheduler.AdvanceBy(6);
  1727. Assert.Equal(2, connectable.Connections.Count);
  1728. Assert.False(connectable.Connections[1].Disposed);
  1729. // Since the last subscription occurred at 5, advancing to 16 should trigger disconnection. And
  1730. // since we're already up to 11, this should do it:
  1731. scheduler.AdvanceBy(5);
  1732. Assert.Equal(2, connectable.Connections.Count);
  1733. Assert.True(connectable.Connections[1].Disposed);
  1734. seen1 = seen2 = 0;
  1735. terminated1 = terminated2 = false;
  1736. using (refCount.Subscribe(value => seen1 = value, () => terminated1 = true))
  1737. {
  1738. Assert.Equal(0, seen1);
  1739. Assert.Equal(2, connectable.Connections.Count);
  1740. Assert.True(connectable.Connections[1].Disposed);
  1741. using (refCount.Subscribe(value => seen2 = value, () => terminated2 = true))
  1742. {
  1743. Assert.Equal(99, seen1);
  1744. Assert.Equal(99, seen2);
  1745. Assert.False(terminated1);
  1746. Assert.False(terminated2);
  1747. Assert.Equal(3, connectable.Connections.Count);
  1748. Assert.False(connectable.Connections[2].Disposed);
  1749. }
  1750. }
  1751. }
  1752. #endregion
  1753. }
  1754. }