ConnectableObservableTest.cs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Linq;
  7. using System.Reactive.Subjects;
  8. using Microsoft.Reactive.Testing;
  9. using ReactiveTests.Dummies;
  10. using Microsoft.VisualStudio.TestTools.UnitTesting;
  11. using Assert = Xunit.Assert;
  12. namespace ReactiveTests.Tests
  13. {
  14. [TestClass]
  15. public partial class ConnectableObservableTest : ReactiveTest
  16. {
  17. [TestMethod]
  18. public void ConnectableObservable_Creation()
  19. {
  20. var y = 0;
  21. var s2 = new Subject<int>();
  22. var co2 = new ConnectableObservable<int>(Observable.Return(1), s2);
  23. co2.Subscribe(x => y = x);
  24. Assert.NotEqual(1, y);
  25. co2.Connect();
  26. Assert.Equal(1, y);
  27. }
  28. [TestMethod]
  29. public void ConnectableObservable_Connected()
  30. {
  31. var scheduler = new TestScheduler();
  32. var xs = scheduler.CreateHotObservable(
  33. OnNext(210, 1),
  34. OnNext(220, 2),
  35. OnNext(230, 3),
  36. OnNext(240, 4),
  37. OnCompleted<int>(250)
  38. );
  39. var subject = new MySubject();
  40. var conn = new ConnectableObservable<int>(xs, subject);
  41. var disconnect = conn.Connect();
  42. var res = scheduler.Start(() => conn);
  43. res.Messages.AssertEqual(
  44. OnNext(210, 1),
  45. OnNext(220, 2),
  46. OnNext(230, 3),
  47. OnNext(240, 4),
  48. OnCompleted<int>(250)
  49. );
  50. }
  51. [TestMethod]
  52. public void ConnectableObservable_NotConnected()
  53. {
  54. var scheduler = new TestScheduler();
  55. var xs = scheduler.CreateHotObservable(
  56. OnNext(210, 1),
  57. OnNext(220, 2),
  58. OnNext(230, 3),
  59. OnNext(240, 4),
  60. OnCompleted<int>(250)
  61. );
  62. var subject = new MySubject();
  63. var conn = new ConnectableObservable<int>(xs, subject);
  64. var res = scheduler.Start(() => conn);
  65. res.Messages.AssertEqual(
  66. );
  67. }
  68. [TestMethod]
  69. public void ConnectableObservable_Disconnected()
  70. {
  71. var scheduler = new TestScheduler();
  72. var xs = scheduler.CreateHotObservable(
  73. OnNext(210, 1),
  74. OnNext(220, 2),
  75. OnNext(230, 3),
  76. OnNext(240, 4),
  77. OnCompleted<int>(250)
  78. );
  79. var subject = new MySubject();
  80. var conn = new ConnectableObservable<int>(xs, subject);
  81. var disconnect = conn.Connect();
  82. disconnect.Dispose();
  83. var res = scheduler.Start(() => conn);
  84. res.Messages.AssertEqual(
  85. );
  86. }
  87. [TestMethod]
  88. public void ConnectableObservable_DisconnectFuture()
  89. {
  90. var scheduler = new TestScheduler();
  91. var xs = scheduler.CreateHotObservable(
  92. OnNext(210, 1),
  93. OnNext(220, 2),
  94. OnNext(230, 3),
  95. OnNext(240, 4),
  96. OnCompleted<int>(250)
  97. );
  98. var subject = new MySubject();
  99. var conn = new ConnectableObservable<int>(xs, subject);
  100. subject.DisposeOn(3, conn.Connect());
  101. var res = scheduler.Start(() => conn);
  102. res.Messages.AssertEqual(
  103. OnNext(210, 1),
  104. OnNext(220, 2),
  105. OnNext(230, 3)
  106. );
  107. }
  108. [TestMethod]
  109. public void ConnectableObservable_ArgumentChecking()
  110. {
  111. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Publish().Subscribe(default));
  112. }
  113. [TestMethod]
  114. public void ConnectableObservable_MultipleNonOverlappedConnections()
  115. {
  116. var scheduler = new TestScheduler();
  117. var xs = scheduler.CreateHotObservable(
  118. OnNext(210, 1),
  119. OnNext(220, 2),
  120. OnNext(230, 3),
  121. OnNext(240, 4),
  122. OnNext(250, 5),
  123. OnNext(260, 6),
  124. OnNext(270, 7),
  125. OnNext(280, 8),
  126. OnNext(290, 9),
  127. OnCompleted<int>(300)
  128. );
  129. var subject = new Subject<int>();
  130. var conn = xs.Multicast(subject);
  131. var c1 = default(IDisposable);
  132. scheduler.ScheduleAbsolute(225, () => { c1 = conn.Connect(); });
  133. scheduler.ScheduleAbsolute(241, () => { c1.Dispose(); });
  134. scheduler.ScheduleAbsolute(245, () => { c1.Dispose(); }); // idempotency test
  135. scheduler.ScheduleAbsolute(251, () => { c1.Dispose(); }); // idempotency test
  136. scheduler.ScheduleAbsolute(260, () => { c1.Dispose(); }); // idempotency test
  137. var c2 = default(IDisposable);
  138. scheduler.ScheduleAbsolute(249, () => { c2 = conn.Connect(); });
  139. scheduler.ScheduleAbsolute(255, () => { c2.Dispose(); });
  140. scheduler.ScheduleAbsolute(265, () => { c2.Dispose(); }); // idempotency test
  141. scheduler.ScheduleAbsolute(280, () => { c2.Dispose(); }); // idempotency test
  142. var c3 = default(IDisposable);
  143. scheduler.ScheduleAbsolute(275, () => { c3 = conn.Connect(); });
  144. scheduler.ScheduleAbsolute(295, () => { c3.Dispose(); });
  145. var res = scheduler.Start(() => conn);
  146. res.Messages.AssertEqual(
  147. OnNext(230, 3),
  148. OnNext(240, 4),
  149. OnNext(250, 5),
  150. OnNext(280, 8),
  151. OnNext(290, 9)
  152. );
  153. xs.Subscriptions.AssertEqual(
  154. Subscribe(225, 241),
  155. Subscribe(249, 255),
  156. Subscribe(275, 295)
  157. );
  158. }
  159. }
  160. }