1
0

ReplaySubjectTest.cs 66 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Reactive;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Subjects;
  6. using Microsoft.Reactive.Testing;
  7. using Microsoft.VisualStudio.TestTools.UnitTesting;
  8. using ReactiveTests.Dummies;
  9. namespace ReactiveTests.Tests
  10. {
  11. [TestClass]
  12. public partial class ReplaySubjectTest : ReactiveTest
  13. {
  14. [TestMethod]
  15. public void Subscribe_ArgumentChecking()
  16. {
  17. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().Subscribe(null));
  18. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).Subscribe(null));
  19. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).Subscribe(null));
  20. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).Subscribe(null));
  21. }
  22. [TestMethod]
  23. public void OnError_ArgumentChecking()
  24. {
  25. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().OnError(null));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).OnError(null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).OnError(null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).OnError(null));
  29. }
  30. [TestMethod]
  31. public void Constructor_ArgumentChecking()
  32. {
  33. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1));
  34. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, DummyScheduler.Instance));
  35. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero));
  36. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero, DummyScheduler.Instance));
  37. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1)));
  38. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  39. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1)));
  40. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  41. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(null));
  42. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, null));
  43. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(TimeSpan.Zero, null));
  44. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, TimeSpan.Zero, null));
  45. // zero allowed
  46. new ReplaySubject<int>(0);
  47. new ReplaySubject<int>(TimeSpan.Zero);
  48. new ReplaySubject<int>(0, TimeSpan.Zero);
  49. new ReplaySubject<int>(0, DummyScheduler.Instance);
  50. new ReplaySubject<int>(TimeSpan.Zero, DummyScheduler.Instance);
  51. new ReplaySubject<int>(0, TimeSpan.Zero, DummyScheduler.Instance);
  52. }
  53. [TestMethod]
  54. public void Infinite_ReplayByTime()
  55. {
  56. var scheduler = new TestScheduler();
  57. var xs = scheduler.CreateHotObservable(
  58. OnNext(70, 1),
  59. OnNext(110, 2),
  60. OnNext(220, 3),
  61. OnNext(270, 4),
  62. OnNext(340, 5),
  63. OnNext(410, 6),
  64. OnNext(520, 7),
  65. OnNext(630, 8),
  66. OnNext(710, 9),
  67. OnNext(870, 10),
  68. OnNext(940, 11),
  69. OnNext(1020, 12)
  70. );
  71. var subject = default(ReplaySubject<int>);
  72. var subscription = default(IDisposable);
  73. var results1 = scheduler.CreateObserver<int>();
  74. var subscription1 = default(IDisposable);
  75. var results2 = scheduler.CreateObserver<int>();
  76. var subscription2 = default(IDisposable);
  77. var results3 = scheduler.CreateObserver<int>();
  78. var subscription3 = default(IDisposable);
  79. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  80. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  81. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  82. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  83. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  84. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  85. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  86. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  87. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  88. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  89. scheduler.Start();
  90. results1.Messages.AssertEqual(
  91. OnNext(301, 3),
  92. OnNext(302, 4),
  93. OnNext(341, 5),
  94. OnNext(411, 6),
  95. OnNext(521, 7)
  96. );
  97. results2.Messages.AssertEqual(
  98. OnNext(401, 5),
  99. OnNext(411, 6),
  100. OnNext(521, 7),
  101. OnNext(631, 8)
  102. );
  103. results3.Messages.AssertEqual(
  104. OnNext(901, 10),
  105. OnNext(941, 11)
  106. );
  107. }
  108. [TestMethod]
  109. public void Infinite_ReplayOne()
  110. {
  111. var scheduler = new TestScheduler();
  112. var xs = scheduler.CreateHotObservable(
  113. OnNext(70, 1),
  114. OnNext(110, 2),
  115. OnNext(220, 3),
  116. OnNext(270, 4),
  117. OnNext(340, 5),
  118. OnNext(410, 6),
  119. OnNext(520, 7),
  120. OnNext(630, 8),
  121. OnNext(710, 9),
  122. OnNext(870, 10),
  123. OnNext(940, 11),
  124. OnNext(1020, 12)
  125. );
  126. var subject = default(ReplaySubject<int>);
  127. var subscription = default(IDisposable);
  128. var results1 = scheduler.CreateObserver<int>();
  129. var subscription1 = default(IDisposable);
  130. var results2 = scheduler.CreateObserver<int>();
  131. var subscription2 = default(IDisposable);
  132. var results3 = scheduler.CreateObserver<int>();
  133. var subscription3 = default(IDisposable);
  134. var results4 = scheduler.CreateObserver<int>();
  135. var subscription4 = default(IDisposable);
  136. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  137. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  138. scheduler.ScheduleAbsolute(1200, () => subscription.Dispose());
  139. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  140. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  141. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  142. scheduler.ScheduleAbsolute(1100, () => subscription4 = subject.Subscribe(results4));
  143. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  144. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  145. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  146. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  147. scheduler.Start();
  148. results1.Messages.AssertEqual(
  149. OnNext(300, 4),
  150. OnNext(340, 5),
  151. OnNext(410, 6),
  152. OnNext(520, 7)
  153. );
  154. results2.Messages.AssertEqual(
  155. OnNext(400, 5),
  156. OnNext(410, 6),
  157. OnNext(520, 7),
  158. OnNext(630, 8)
  159. );
  160. results3.Messages.AssertEqual(
  161. OnNext(900, 10),
  162. OnNext(940, 11)
  163. );
  164. results4.Messages.AssertEqual(
  165. OnNext(1100, 12)
  166. );
  167. }
  168. [TestMethod]
  169. public void Infinite_ReplayMany()
  170. {
  171. var scheduler = new TestScheduler();
  172. var xs = scheduler.CreateHotObservable(
  173. OnNext(70, 1),
  174. OnNext(110, 2),
  175. OnNext(220, 3),
  176. OnNext(270, 4),
  177. OnNext(340, 5),
  178. OnNext(410, 6),
  179. OnNext(520, 7),
  180. OnNext(630, 8),
  181. OnNext(710, 9),
  182. OnNext(870, 10),
  183. OnNext(940, 11),
  184. OnNext(1020, 12)
  185. );
  186. var subject = default(ReplaySubject<int>);
  187. var subscription = default(IDisposable);
  188. var results1 = scheduler.CreateObserver<int>();
  189. var subscription1 = default(IDisposable);
  190. var results2 = scheduler.CreateObserver<int>();
  191. var subscription2 = default(IDisposable);
  192. var results3 = scheduler.CreateObserver<int>();
  193. var subscription3 = default(IDisposable);
  194. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  195. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  196. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  197. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  198. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  199. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  200. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  201. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  202. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  203. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  204. scheduler.Start();
  205. results1.Messages.AssertEqual(
  206. OnNext(300, 3),
  207. OnNext(300, 4),
  208. OnNext(340, 5),
  209. OnNext(410, 6),
  210. OnNext(520, 7)
  211. );
  212. results2.Messages.AssertEqual(
  213. OnNext(400, 3),
  214. OnNext(400, 4),
  215. OnNext(400, 5),
  216. OnNext(410, 6),
  217. OnNext(520, 7),
  218. OnNext(630, 8)
  219. );
  220. results3.Messages.AssertEqual(
  221. OnNext(900, 8),
  222. OnNext(900, 9),
  223. OnNext(900, 10),
  224. OnNext(940, 11)
  225. );
  226. }
  227. [TestMethod]
  228. public void Infinite_ReplayAll()
  229. {
  230. var scheduler = new TestScheduler();
  231. var xs = scheduler.CreateHotObservable(
  232. OnNext(70, 1),
  233. OnNext(110, 2),
  234. OnNext(220, 3),
  235. OnNext(270, 4),
  236. OnNext(340, 5),
  237. OnNext(410, 6),
  238. OnNext(520, 7),
  239. OnNext(630, 8),
  240. OnNext(710, 9),
  241. OnNext(870, 10),
  242. OnNext(940, 11),
  243. OnNext(1020, 12)
  244. );
  245. var subject = default(ReplaySubject<int>);
  246. var subscription = default(IDisposable);
  247. var results1 = scheduler.CreateObserver<int>();
  248. var subscription1 = default(IDisposable);
  249. var results2 = scheduler.CreateObserver<int>();
  250. var subscription2 = default(IDisposable);
  251. var results3 = scheduler.CreateObserver<int>();
  252. var subscription3 = default(IDisposable);
  253. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  254. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  255. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  256. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  257. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  258. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  259. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  260. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  261. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  262. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  263. scheduler.Start();
  264. results1.Messages.AssertEqual(
  265. OnNext(300, 3),
  266. OnNext(300, 4),
  267. OnNext(340, 5),
  268. OnNext(410, 6),
  269. OnNext(520, 7)
  270. );
  271. results2.Messages.AssertEqual(
  272. OnNext(400, 3),
  273. OnNext(400, 4),
  274. OnNext(400, 5),
  275. OnNext(410, 6),
  276. OnNext(520, 7),
  277. OnNext(630, 8)
  278. );
  279. results3.Messages.AssertEqual(
  280. OnNext(900, 3),
  281. OnNext(900, 4),
  282. OnNext(900, 5),
  283. OnNext(900, 6),
  284. OnNext(900, 7),
  285. OnNext(900, 8),
  286. OnNext(900, 9),
  287. OnNext(900, 10),
  288. OnNext(940, 11)
  289. );
  290. }
  291. [TestMethod]
  292. public void Infinite2()
  293. {
  294. var scheduler = new TestScheduler();
  295. var xs = scheduler.CreateHotObservable(
  296. OnNext(70, 1),
  297. OnNext(110, 2),
  298. OnNext(220, 3),
  299. OnNext(270, 4),
  300. OnNext(280, -1),
  301. OnNext(290, -2),
  302. OnNext(340, 5),
  303. OnNext(410, 6),
  304. OnNext(520, 7),
  305. OnNext(630, 8),
  306. OnNext(710, 9),
  307. OnNext(870, 10),
  308. OnNext(940, 11),
  309. OnNext(1020, 12)
  310. );
  311. var subject = default(ReplaySubject<int>);
  312. var subscription = default(IDisposable);
  313. var results1 = scheduler.CreateObserver<int>();
  314. var subscription1 = default(IDisposable);
  315. var results2 = scheduler.CreateObserver<int>();
  316. var subscription2 = default(IDisposable);
  317. var results3 = scheduler.CreateObserver<int>();
  318. var subscription3 = default(IDisposable);
  319. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  320. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  321. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  322. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  323. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  324. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  325. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  326. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  327. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  328. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  329. scheduler.Start();
  330. results1.Messages.AssertEqual(
  331. OnNext(301, 4),
  332. OnNext(302, -1),
  333. OnNext(303, -2),
  334. OnNext(341, 5),
  335. OnNext(411, 6),
  336. OnNext(521, 7)
  337. );
  338. results2.Messages.AssertEqual(
  339. OnNext(401, 5),
  340. OnNext(411, 6),
  341. OnNext(521, 7),
  342. OnNext(631, 8)
  343. );
  344. results3.Messages.AssertEqual(
  345. OnNext(901, 10),
  346. OnNext(941, 11)
  347. );
  348. }
  349. [TestMethod]
  350. public void Finite_ReplayByTime()
  351. {
  352. var scheduler = new TestScheduler();
  353. var xs = scheduler.CreateHotObservable(
  354. OnNext(70, 1),
  355. OnNext(110, 2),
  356. OnNext(220, 3),
  357. OnNext(270, 4),
  358. OnNext(340, 5),
  359. OnNext(410, 6),
  360. OnNext(520, 7),
  361. OnCompleted<int>(630),
  362. OnNext(640, 9),
  363. OnCompleted<int>(650),
  364. OnError<int>(660, new Exception())
  365. );
  366. var subject = default(ReplaySubject<int>);
  367. var subscription = default(IDisposable);
  368. var results1 = scheduler.CreateObserver<int>();
  369. var subscription1 = default(IDisposable);
  370. var results2 = scheduler.CreateObserver<int>();
  371. var subscription2 = default(IDisposable);
  372. var results3 = scheduler.CreateObserver<int>();
  373. var subscription3 = default(IDisposable);
  374. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  375. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  376. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  377. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  378. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  379. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  380. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  381. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  382. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  383. scheduler.Start();
  384. results1.Messages.AssertEqual(
  385. OnNext(301, 3),
  386. OnNext(302, 4),
  387. OnNext(341, 5),
  388. OnNext(411, 6),
  389. OnNext(521, 7)
  390. );
  391. results2.Messages.AssertEqual(
  392. OnNext(401, 5),
  393. OnNext(411, 6),
  394. OnNext(521, 7),
  395. OnCompleted<int>(631)
  396. );
  397. results3.Messages.AssertEqual(
  398. OnCompleted<int>(901)
  399. );
  400. }
  401. [TestMethod]
  402. public void Finite_ReplayOne()
  403. {
  404. var scheduler = new TestScheduler();
  405. var xs = scheduler.CreateHotObservable(
  406. OnNext(70, 1),
  407. OnNext(110, 2),
  408. OnNext(220, 3),
  409. OnNext(270, 4),
  410. OnNext(340, 5),
  411. OnNext(410, 6),
  412. OnNext(520, 7),
  413. OnCompleted<int>(630),
  414. OnNext(640, 9),
  415. OnCompleted<int>(650),
  416. OnError<int>(660, new Exception())
  417. );
  418. var subject = default(ReplaySubject<int>);
  419. var subscription = default(IDisposable);
  420. var results1 = scheduler.CreateObserver<int>();
  421. var subscription1 = default(IDisposable);
  422. var results2 = scheduler.CreateObserver<int>();
  423. var subscription2 = default(IDisposable);
  424. var results3 = scheduler.CreateObserver<int>();
  425. var subscription3 = default(IDisposable);
  426. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  427. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  428. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  429. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  430. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  431. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  432. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  433. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  434. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  435. scheduler.Start();
  436. results1.Messages.AssertEqual(
  437. OnNext(300, 4),
  438. OnNext(340, 5),
  439. OnNext(410, 6),
  440. OnNext(520, 7)
  441. );
  442. results2.Messages.AssertEqual(
  443. OnNext(400, 5),
  444. OnNext(410, 6),
  445. OnNext(520, 7),
  446. OnCompleted<int>(630)
  447. );
  448. results3.Messages.AssertEqual(
  449. OnNext(900, 7),
  450. OnCompleted<int>(900)
  451. );
  452. }
  453. [TestMethod]
  454. public void Finite_ReplayMany()
  455. {
  456. var scheduler = new TestScheduler();
  457. var xs = scheduler.CreateHotObservable(
  458. OnNext(70, 1),
  459. OnNext(110, 2),
  460. OnNext(220, 3),
  461. OnNext(270, 4),
  462. OnNext(340, 5),
  463. OnNext(410, 6),
  464. OnNext(520, 7),
  465. OnCompleted<int>(630),
  466. OnNext(640, 9),
  467. OnCompleted<int>(650),
  468. OnError<int>(660, new Exception())
  469. );
  470. var subject = default(ReplaySubject<int>);
  471. var subscription = default(IDisposable);
  472. var results1 = scheduler.CreateObserver<int>();
  473. var subscription1 = default(IDisposable);
  474. var results2 = scheduler.CreateObserver<int>();
  475. var subscription2 = default(IDisposable);
  476. var results3 = scheduler.CreateObserver<int>();
  477. var subscription3 = default(IDisposable);
  478. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  479. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  480. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  481. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  482. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  483. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  484. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  485. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  486. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  487. scheduler.Start();
  488. results1.Messages.AssertEqual(
  489. OnNext(300, 3),
  490. OnNext(300, 4),
  491. OnNext(340, 5),
  492. OnNext(410, 6),
  493. OnNext(520, 7)
  494. );
  495. results2.Messages.AssertEqual(
  496. OnNext(400, 3),
  497. OnNext(400, 4),
  498. OnNext(400, 5),
  499. OnNext(410, 6),
  500. OnNext(520, 7),
  501. OnCompleted<int>(630)
  502. );
  503. results3.Messages.AssertEqual(
  504. OnNext(900, 5),
  505. OnNext(900, 6),
  506. OnNext(900, 7),
  507. OnCompleted<int>(900)
  508. );
  509. }
  510. [TestMethod]
  511. public void Finite_ReplayAll()
  512. {
  513. var scheduler = new TestScheduler();
  514. var xs = scheduler.CreateHotObservable(
  515. OnNext(70, 1),
  516. OnNext(110, 2),
  517. OnNext(220, 3),
  518. OnNext(270, 4),
  519. OnNext(340, 5),
  520. OnNext(410, 6),
  521. OnNext(520, 7),
  522. OnCompleted<int>(630),
  523. OnNext(640, 9),
  524. OnCompleted<int>(650),
  525. OnError<int>(660, new Exception())
  526. );
  527. var subject = default(ReplaySubject<int>);
  528. var subscription = default(IDisposable);
  529. var results1 = scheduler.CreateObserver<int>();
  530. var subscription1 = default(IDisposable);
  531. var results2 = scheduler.CreateObserver<int>();
  532. var subscription2 = default(IDisposable);
  533. var results3 = scheduler.CreateObserver<int>();
  534. var subscription3 = default(IDisposable);
  535. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  536. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  537. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  538. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  539. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  540. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  541. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  542. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  543. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  544. scheduler.Start();
  545. results1.Messages.AssertEqual(
  546. OnNext(300, 3),
  547. OnNext(300, 4),
  548. OnNext(340, 5),
  549. OnNext(410, 6),
  550. OnNext(520, 7)
  551. );
  552. results2.Messages.AssertEqual(
  553. OnNext(400, 3),
  554. OnNext(400, 4),
  555. OnNext(400, 5),
  556. OnNext(410, 6),
  557. OnNext(520, 7),
  558. OnCompleted<int>(630)
  559. );
  560. results3.Messages.AssertEqual(
  561. OnNext(900, 3),
  562. OnNext(900, 4),
  563. OnNext(900, 5),
  564. OnNext(900, 6),
  565. OnNext(900, 7),
  566. OnCompleted<int>(900)
  567. );
  568. }
  569. [TestMethod]
  570. public void Error_ReplayByTime()
  571. {
  572. var scheduler = new TestScheduler();
  573. var ex = new Exception();
  574. var xs = scheduler.CreateHotObservable(
  575. OnNext(70, 1),
  576. OnNext(110, 2),
  577. OnNext(220, 3),
  578. OnNext(270, 4),
  579. OnNext(340, 5),
  580. OnNext(410, 6),
  581. OnNext(520, 7),
  582. OnError<int>(630, ex),
  583. OnNext(640, 9),
  584. OnCompleted<int>(650),
  585. OnError<int>(660, new Exception())
  586. );
  587. var subject = default(ReplaySubject<int>);
  588. var subscription = default(IDisposable);
  589. var results1 = scheduler.CreateObserver<int>();
  590. var subscription1 = default(IDisposable);
  591. var results2 = scheduler.CreateObserver<int>();
  592. var subscription2 = default(IDisposable);
  593. var results3 = scheduler.CreateObserver<int>();
  594. var subscription3 = default(IDisposable);
  595. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  596. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  597. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  598. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  599. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  600. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  601. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  602. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  603. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  604. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  605. scheduler.Start();
  606. results1.Messages.AssertEqual(
  607. OnNext(301, 3),
  608. OnNext(302, 4),
  609. OnNext(341, 5),
  610. OnNext(411, 6),
  611. OnNext(521, 7)
  612. );
  613. results2.Messages.AssertEqual(
  614. OnNext(401, 5),
  615. OnNext(411, 6),
  616. OnNext(521, 7),
  617. OnError<int>(631, ex)
  618. );
  619. results3.Messages.AssertEqual(
  620. OnError<int>(901, ex)
  621. );
  622. }
  623. [TestMethod]
  624. public void Error_ReplayOne()
  625. {
  626. var scheduler = new TestScheduler();
  627. var ex = new Exception();
  628. var xs = scheduler.CreateHotObservable(
  629. OnNext(70, 1),
  630. OnNext(110, 2),
  631. OnNext(220, 3),
  632. OnNext(270, 4),
  633. OnNext(340, 5),
  634. OnNext(410, 6),
  635. OnNext(520, 7),
  636. OnError<int>(630, ex),
  637. OnNext(640, 9),
  638. OnCompleted<int>(650),
  639. OnError<int>(660, new Exception())
  640. );
  641. var subject = default(ReplaySubject<int>);
  642. var subscription = default(IDisposable);
  643. var results1 = scheduler.CreateObserver<int>();
  644. var subscription1 = default(IDisposable);
  645. var results2 = scheduler.CreateObserver<int>();
  646. var subscription2 = default(IDisposable);
  647. var results3 = scheduler.CreateObserver<int>();
  648. var subscription3 = default(IDisposable);
  649. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  650. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  651. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  652. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  653. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  654. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  655. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  656. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  657. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  658. scheduler.Start();
  659. results1.Messages.AssertEqual(
  660. OnNext(300, 4),
  661. OnNext(340, 5),
  662. OnNext(410, 6),
  663. OnNext(520, 7)
  664. );
  665. results2.Messages.AssertEqual(
  666. OnNext(400, 5),
  667. OnNext(410, 6),
  668. OnNext(520, 7),
  669. OnError<int>(630, ex)
  670. );
  671. results3.Messages.AssertEqual(
  672. OnNext(900, 7),
  673. OnError<int>(900, ex)
  674. );
  675. }
  676. [TestMethod]
  677. public void Error_ReplayMany()
  678. {
  679. var scheduler = new TestScheduler();
  680. var ex = new Exception();
  681. var xs = scheduler.CreateHotObservable(
  682. OnNext(70, 1),
  683. OnNext(110, 2),
  684. OnNext(220, 3),
  685. OnNext(270, 4),
  686. OnNext(340, 5),
  687. OnNext(410, 6),
  688. OnNext(520, 7),
  689. OnError<int>(630, ex),
  690. OnNext(640, 9),
  691. OnCompleted<int>(650),
  692. OnError<int>(660, new Exception())
  693. );
  694. var subject = default(ReplaySubject<int>);
  695. var subscription = default(IDisposable);
  696. var results1 = scheduler.CreateObserver<int>();
  697. var subscription1 = default(IDisposable);
  698. var results2 = scheduler.CreateObserver<int>();
  699. var subscription2 = default(IDisposable);
  700. var results3 = scheduler.CreateObserver<int>();
  701. var subscription3 = default(IDisposable);
  702. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  703. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  704. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  705. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  706. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  707. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  708. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  709. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  710. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  711. scheduler.Start();
  712. results1.Messages.AssertEqual(
  713. OnNext(300, 3),
  714. OnNext(300, 4),
  715. OnNext(340, 5),
  716. OnNext(410, 6),
  717. OnNext(520, 7)
  718. );
  719. results2.Messages.AssertEqual(
  720. OnNext(400, 3),
  721. OnNext(400, 4),
  722. OnNext(400, 5),
  723. OnNext(410, 6),
  724. OnNext(520, 7),
  725. OnError<int>(630, ex)
  726. );
  727. results3.Messages.AssertEqual(
  728. OnNext(900, 5),
  729. OnNext(900, 6),
  730. OnNext(900, 7),
  731. OnError<int>(900, ex)
  732. );
  733. }
  734. [TestMethod]
  735. public void Error_ReplayAll()
  736. {
  737. var scheduler = new TestScheduler();
  738. var ex = new Exception();
  739. var xs = scheduler.CreateHotObservable(
  740. OnNext(70, 1),
  741. OnNext(110, 2),
  742. OnNext(220, 3),
  743. OnNext(270, 4),
  744. OnNext(340, 5),
  745. OnNext(410, 6),
  746. OnNext(520, 7),
  747. OnError<int>(630, ex),
  748. OnNext(640, 9),
  749. OnCompleted<int>(650),
  750. OnError<int>(660, new Exception())
  751. );
  752. var subject = default(ReplaySubject<int>);
  753. var subscription = default(IDisposable);
  754. var results1 = scheduler.CreateObserver<int>();
  755. var subscription1 = default(IDisposable);
  756. var results2 = scheduler.CreateObserver<int>();
  757. var subscription2 = default(IDisposable);
  758. var results3 = scheduler.CreateObserver<int>();
  759. var subscription3 = default(IDisposable);
  760. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  761. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  762. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  763. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  764. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  765. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  766. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  767. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  768. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  769. scheduler.Start();
  770. results1.Messages.AssertEqual(
  771. OnNext(300, 3),
  772. OnNext(300, 4),
  773. OnNext(340, 5),
  774. OnNext(410, 6),
  775. OnNext(520, 7)
  776. );
  777. results2.Messages.AssertEqual(
  778. OnNext(400, 3),
  779. OnNext(400, 4),
  780. OnNext(400, 5),
  781. OnNext(410, 6),
  782. OnNext(520, 7),
  783. OnError<int>(630, ex)
  784. );
  785. results3.Messages.AssertEqual(
  786. OnNext(900, 3),
  787. OnNext(900, 4),
  788. OnNext(900, 5),
  789. OnNext(900, 6),
  790. OnNext(900, 7),
  791. OnError<int>(900, ex)
  792. );
  793. }
  794. [TestMethod]
  795. public void Canceled_ReplayByTime()
  796. {
  797. var scheduler = new TestScheduler();
  798. var xs = scheduler.CreateHotObservable(
  799. OnCompleted<int>(630),
  800. OnNext(640, 9),
  801. OnCompleted<int>(650),
  802. OnError<int>(660, new Exception())
  803. );
  804. var subject = default(ReplaySubject<int>);
  805. var subscription = default(IDisposable);
  806. var results1 = scheduler.CreateObserver<int>();
  807. var subscription1 = default(IDisposable);
  808. var results2 = scheduler.CreateObserver<int>();
  809. var subscription2 = default(IDisposable);
  810. var results3 = scheduler.CreateObserver<int>();
  811. var subscription3 = default(IDisposable);
  812. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  813. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  814. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  815. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  816. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  817. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  818. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  819. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  820. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  821. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  822. scheduler.Start();
  823. results1.Messages.AssertEqual(
  824. );
  825. results2.Messages.AssertEqual(
  826. OnCompleted<int>(631)
  827. );
  828. results3.Messages.AssertEqual(
  829. OnCompleted<int>(901)
  830. );
  831. }
  832. [TestMethod]
  833. public void Canceled_ReplayOne()
  834. {
  835. var scheduler = new TestScheduler();
  836. var xs = scheduler.CreateHotObservable(
  837. OnCompleted<int>(630),
  838. OnNext(640, 9),
  839. OnCompleted<int>(650),
  840. OnError<int>(660, new Exception())
  841. );
  842. var subject = default(ReplaySubject<int>);
  843. var subscription = default(IDisposable);
  844. var results1 = scheduler.CreateObserver<int>();
  845. var subscription1 = default(IDisposable);
  846. var results2 = scheduler.CreateObserver<int>();
  847. var subscription2 = default(IDisposable);
  848. var results3 = scheduler.CreateObserver<int>();
  849. var subscription3 = default(IDisposable);
  850. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  851. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  852. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  853. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  854. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  855. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  856. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  857. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  858. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  859. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  860. scheduler.Start();
  861. results1.Messages.AssertEqual(
  862. );
  863. results2.Messages.AssertEqual(
  864. OnCompleted<int>(630)
  865. );
  866. results3.Messages.AssertEqual(
  867. OnCompleted<int>(900)
  868. );
  869. }
  870. [TestMethod]
  871. public void Canceled_ReplayMany()
  872. {
  873. var scheduler = new TestScheduler();
  874. var xs = scheduler.CreateHotObservable(
  875. OnCompleted<int>(630),
  876. OnNext(640, 9),
  877. OnCompleted<int>(650),
  878. OnError<int>(660, new Exception())
  879. );
  880. var subject = default(ReplaySubject<int>);
  881. var subscription = default(IDisposable);
  882. var results1 = scheduler.CreateObserver<int>();
  883. var subscription1 = default(IDisposable);
  884. var results2 = scheduler.CreateObserver<int>();
  885. var subscription2 = default(IDisposable);
  886. var results3 = scheduler.CreateObserver<int>();
  887. var subscription3 = default(IDisposable);
  888. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  889. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  890. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  891. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  892. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  893. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  894. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  895. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  896. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  897. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  898. scheduler.Start();
  899. results1.Messages.AssertEqual(
  900. );
  901. results2.Messages.AssertEqual(
  902. OnCompleted<int>(630)
  903. );
  904. results3.Messages.AssertEqual(
  905. OnCompleted<int>(900)
  906. );
  907. }
  908. [TestMethod]
  909. public void Canceled_ReplayAll()
  910. {
  911. var scheduler = new TestScheduler();
  912. var xs = scheduler.CreateHotObservable(
  913. OnCompleted<int>(630),
  914. OnNext(640, 9),
  915. OnCompleted<int>(650),
  916. OnError<int>(660, new Exception())
  917. );
  918. var subject = default(ReplaySubject<int>);
  919. var subscription = default(IDisposable);
  920. var results1 = scheduler.CreateObserver<int>();
  921. var subscription1 = default(IDisposable);
  922. var results2 = scheduler.CreateObserver<int>();
  923. var subscription2 = default(IDisposable);
  924. var results3 = scheduler.CreateObserver<int>();
  925. var subscription3 = default(IDisposable);
  926. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  927. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  928. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  929. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  930. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  931. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  932. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  933. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  934. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  935. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  936. scheduler.Start();
  937. results1.Messages.AssertEqual(
  938. );
  939. results2.Messages.AssertEqual(
  940. OnCompleted<int>(630)
  941. );
  942. results3.Messages.AssertEqual(
  943. OnCompleted<int>(900)
  944. );
  945. }
  946. [TestMethod]
  947. public void SubjectDisposed()
  948. {
  949. var scheduler = new TestScheduler();
  950. var subject = default(ReplaySubject<int>);
  951. var results1 = scheduler.CreateObserver<int>();
  952. var subscription1 = default(IDisposable);
  953. var results2 = scheduler.CreateObserver<int>();
  954. var subscription2 = default(IDisposable);
  955. var results3 = scheduler.CreateObserver<int>();
  956. var subscription3 = default(IDisposable);
  957. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(scheduler));
  958. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  959. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  960. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  961. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  962. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  963. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  964. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  965. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  966. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  967. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  968. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  969. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  970. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  971. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  972. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  973. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  974. scheduler.Start();
  975. results1.Messages.AssertEqual(
  976. OnNext(201, 1),
  977. OnNext(251, 2),
  978. OnNext(351, 3),
  979. OnNext(451, 4)
  980. );
  981. results2.Messages.AssertEqual(
  982. OnNext(301, 1),
  983. OnNext(302, 2),
  984. OnNext(351, 3),
  985. OnNext(451, 4),
  986. OnNext(551, 5)
  987. );
  988. results3.Messages.AssertEqual(
  989. OnNext(401, 1),
  990. OnNext(402, 2),
  991. OnNext(403, 3),
  992. OnNext(451, 4),
  993. OnNext(551, 5)
  994. );
  995. }
  996. [TestMethod]
  997. public void SubjectDisposed_ReplayOne()
  998. {
  999. var scheduler = new TestScheduler();
  1000. var subject = default(ReplaySubject<int>);
  1001. var results1 = scheduler.CreateObserver<int>();
  1002. var subscription1 = default(IDisposable);
  1003. var results2 = scheduler.CreateObserver<int>();
  1004. var subscription2 = default(IDisposable);
  1005. var results3 = scheduler.CreateObserver<int>();
  1006. var subscription3 = default(IDisposable);
  1007. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  1008. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1009. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1010. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1011. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1012. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1013. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1014. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1015. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1016. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1017. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1018. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1019. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1020. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1021. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1022. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1023. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1024. scheduler.Start();
  1025. results1.Messages.AssertEqual(
  1026. OnNext(200, 1),
  1027. OnNext(250, 2),
  1028. OnNext(350, 3),
  1029. OnNext(450, 4)
  1030. );
  1031. results2.Messages.AssertEqual(
  1032. OnNext(300, 2),
  1033. OnNext(350, 3),
  1034. OnNext(450, 4),
  1035. OnNext(550, 5)
  1036. );
  1037. results3.Messages.AssertEqual(
  1038. OnNext(400, 3),
  1039. OnNext(450, 4),
  1040. OnNext(550, 5)
  1041. );
  1042. }
  1043. [TestMethod]
  1044. public void SubjectDisposed_ReplayMany()
  1045. {
  1046. var scheduler = new TestScheduler();
  1047. var subject = default(ReplaySubject<int>);
  1048. var results1 = scheduler.CreateObserver<int>();
  1049. var subscription1 = default(IDisposable);
  1050. var results2 = scheduler.CreateObserver<int>();
  1051. var subscription2 = default(IDisposable);
  1052. var results3 = scheduler.CreateObserver<int>();
  1053. var subscription3 = default(IDisposable);
  1054. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  1055. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1056. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1057. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1058. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1059. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1060. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1061. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1062. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1063. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1064. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1065. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1066. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1067. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1068. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1069. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1070. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1071. scheduler.Start();
  1072. results1.Messages.AssertEqual(
  1073. OnNext(200, 1),
  1074. OnNext(250, 2),
  1075. OnNext(350, 3),
  1076. OnNext(450, 4)
  1077. );
  1078. results2.Messages.AssertEqual(
  1079. OnNext(300, 1),
  1080. OnNext(300, 2),
  1081. OnNext(350, 3),
  1082. OnNext(450, 4),
  1083. OnNext(550, 5)
  1084. );
  1085. results3.Messages.AssertEqual(
  1086. OnNext(400, 1),
  1087. OnNext(400, 2),
  1088. OnNext(400, 3),
  1089. OnNext(450, 4),
  1090. OnNext(550, 5)
  1091. );
  1092. }
  1093. [TestMethod]
  1094. public void SubjectDisposed_ReplayAll()
  1095. {
  1096. var scheduler = new TestScheduler();
  1097. var subject = default(ReplaySubject<int>);
  1098. var results1 = scheduler.CreateObserver<int>();
  1099. var subscription1 = default(IDisposable);
  1100. var results2 = scheduler.CreateObserver<int>();
  1101. var subscription2 = default(IDisposable);
  1102. var results3 = scheduler.CreateObserver<int>();
  1103. var subscription3 = default(IDisposable);
  1104. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  1105. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1106. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1107. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1108. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1109. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1110. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1111. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1112. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1113. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1114. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1115. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1116. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1117. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1118. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1119. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1120. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1121. scheduler.Start();
  1122. results1.Messages.AssertEqual(
  1123. OnNext(200, 1),
  1124. OnNext(250, 2),
  1125. OnNext(350, 3),
  1126. OnNext(450, 4)
  1127. );
  1128. results2.Messages.AssertEqual(
  1129. OnNext(300, 1),
  1130. OnNext(300, 2),
  1131. OnNext(350, 3),
  1132. OnNext(450, 4),
  1133. OnNext(550, 5)
  1134. );
  1135. results3.Messages.AssertEqual(
  1136. OnNext(400, 1),
  1137. OnNext(400, 2),
  1138. OnNext(400, 3),
  1139. OnNext(450, 4),
  1140. OnNext(550, 5)
  1141. );
  1142. }
  1143. //TODO: Create a failing test for this for the other implementations (ReplayOne/Many/All).
  1144. //I Don't understand the behavior.
  1145. //I think it may have to do with calling Trim() on Subscription (as well as in the OnNext calls). -LC
  1146. [TestMethod]
  1147. public void ReplaySubjectDiesOut()
  1148. {
  1149. //
  1150. // Tests v1.x behavior as documented in ReplaySubject.cs (Subscribe method).
  1151. //
  1152. var scheduler = new TestScheduler();
  1153. var xs = scheduler.CreateHotObservable(
  1154. OnNext(70, 1),
  1155. OnNext(110, 2),
  1156. OnNext(220, 3),
  1157. OnNext(270, 4),
  1158. OnNext(340, 5),
  1159. OnNext(410, 6),
  1160. OnNext(520, 7),
  1161. OnCompleted<int>(580)
  1162. );
  1163. var subject = default(ReplaySubject<int>);
  1164. var results1 = scheduler.CreateObserver<int>();
  1165. var results2 = scheduler.CreateObserver<int>();
  1166. var results3 = scheduler.CreateObserver<int>();
  1167. var results4 = scheduler.CreateObserver<int>();
  1168. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(int.MaxValue, TimeSpan.FromTicks(100), scheduler));
  1169. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(subject));
  1170. scheduler.ScheduleAbsolute(300, () => subject.Subscribe(results1));
  1171. scheduler.ScheduleAbsolute(400, () => subject.Subscribe(results2));
  1172. scheduler.ScheduleAbsolute(600, () => subject.Subscribe(results3));
  1173. scheduler.ScheduleAbsolute(900, () => subject.Subscribe(results4));
  1174. scheduler.Start();
  1175. results1.Messages.AssertEqual(
  1176. OnNext(301, 3),
  1177. OnNext(302, 4),
  1178. OnNext(341, 5),
  1179. OnNext(411, 6),
  1180. OnNext(521, 7),
  1181. OnCompleted<int>(581)
  1182. );
  1183. results2.Messages.AssertEqual(
  1184. OnNext(401, 5),
  1185. OnNext(411, 6),
  1186. OnNext(521, 7),
  1187. OnCompleted<int>(581)
  1188. );
  1189. results3.Messages.AssertEqual(
  1190. OnNext(601, 7),
  1191. OnCompleted<int>(602)
  1192. );
  1193. results4.Messages.AssertEqual(
  1194. OnCompleted<int>(901)
  1195. );
  1196. }
  1197. [TestMethod]
  1198. public void HasObservers()
  1199. {
  1200. HasObservers(new ReplaySubject<int>());
  1201. HasObservers(new ReplaySubject<int>(1));
  1202. HasObservers(new ReplaySubject<int>(3));
  1203. HasObservers(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1204. }
  1205. private static void HasObservers(ReplaySubject<int> s)
  1206. {
  1207. Assert.IsFalse(s.HasObservers);
  1208. var d1 = s.Subscribe(_ => { });
  1209. Assert.IsTrue(s.HasObservers);
  1210. d1.Dispose();
  1211. Assert.IsFalse(s.HasObservers);
  1212. var d2 = s.Subscribe(_ => { });
  1213. Assert.IsTrue(s.HasObservers);
  1214. var d3 = s.Subscribe(_ => { });
  1215. Assert.IsTrue(s.HasObservers);
  1216. d2.Dispose();
  1217. Assert.IsTrue(s.HasObservers);
  1218. d3.Dispose();
  1219. Assert.IsFalse(s.HasObservers);
  1220. }
  1221. [TestMethod]
  1222. public void HasObservers_Dispose1()
  1223. {
  1224. HasObservers_Dispose1(new ReplaySubject<int>());
  1225. HasObservers_Dispose1(new ReplaySubject<int>(1));
  1226. HasObservers_Dispose1(new ReplaySubject<int>(3));
  1227. HasObservers_Dispose1(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1228. }
  1229. private static void HasObservers_Dispose1(ReplaySubject<int> s)
  1230. {
  1231. Assert.IsFalse(s.HasObservers);
  1232. var d = s.Subscribe(_ => { });
  1233. Assert.IsTrue(s.HasObservers);
  1234. s.Dispose();
  1235. Assert.IsFalse(s.HasObservers);
  1236. d.Dispose();
  1237. Assert.IsFalse(s.HasObservers);
  1238. }
  1239. [TestMethod]
  1240. public void HasObservers_Dispose2()
  1241. {
  1242. HasObservers_Dispose2(new ReplaySubject<int>());
  1243. HasObservers_Dispose2(new ReplaySubject<int>(1));
  1244. HasObservers_Dispose2(new ReplaySubject<int>(3));
  1245. HasObservers_Dispose2(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1246. }
  1247. private static void HasObservers_Dispose2(ReplaySubject<int> s)
  1248. {
  1249. Assert.IsFalse(s.HasObservers);
  1250. var d = s.Subscribe(_ => { });
  1251. Assert.IsTrue(s.HasObservers);
  1252. d.Dispose();
  1253. Assert.IsFalse(s.HasObservers);
  1254. s.Dispose();
  1255. Assert.IsFalse(s.HasObservers);
  1256. }
  1257. [TestMethod]
  1258. public void HasObservers_Dispose3()
  1259. {
  1260. HasObservers_Dispose3(new ReplaySubject<int>());
  1261. HasObservers_Dispose3(new ReplaySubject<int>(1));
  1262. HasObservers_Dispose3(new ReplaySubject<int>(3));
  1263. HasObservers_Dispose3(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1264. }
  1265. private static void HasObservers_Dispose3(ReplaySubject<int> s)
  1266. {
  1267. Assert.IsFalse(s.HasObservers);
  1268. s.Dispose();
  1269. Assert.IsFalse(s.HasObservers);
  1270. }
  1271. [TestMethod]
  1272. public void HasObservers_OnCompleted()
  1273. {
  1274. HasObservers_OnCompleted(new ReplaySubject<int>());
  1275. HasObservers_OnCompleted(new ReplaySubject<int>(1));
  1276. HasObservers_OnCompleted(new ReplaySubject<int>(3));
  1277. HasObservers_OnCompleted(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1278. }
  1279. private static void HasObservers_OnCompleted(ReplaySubject<int> s)
  1280. {
  1281. Assert.IsFalse(s.HasObservers);
  1282. var d = s.Subscribe(_ => { });
  1283. Assert.IsTrue(s.HasObservers);
  1284. s.OnNext(42);
  1285. Assert.IsTrue(s.HasObservers);
  1286. s.OnCompleted();
  1287. Assert.IsFalse(s.HasObservers);
  1288. }
  1289. [TestMethod]
  1290. public void HasObservers_OnError()
  1291. {
  1292. HasObservers_OnError(new ReplaySubject<int>());
  1293. HasObservers_OnError(new ReplaySubject<int>(1));
  1294. HasObservers_OnError(new ReplaySubject<int>(3));
  1295. HasObservers_OnError(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1296. }
  1297. private static void HasObservers_OnError(ReplaySubject<int> s)
  1298. {
  1299. Assert.IsFalse(s.HasObservers);
  1300. var d = s.Subscribe(_ => { }, ex => { });
  1301. Assert.IsTrue(s.HasObservers);
  1302. s.OnNext(42);
  1303. Assert.IsTrue(s.HasObservers);
  1304. s.OnError(new Exception());
  1305. Assert.IsFalse(s.HasObservers);
  1306. }
  1307. //Potentially already covered by Finite_* tests
  1308. [TestMethod]
  1309. public void Completed_to_late_subscriber_ReplayAll()
  1310. {
  1311. var s = new ReplaySubject<int>();
  1312. s.OnNext(1);
  1313. s.OnNext(2);
  1314. s.OnCompleted();
  1315. var scheduler = new TestScheduler();
  1316. var observer = scheduler.CreateObserver<int>();
  1317. s.Subscribe(observer);
  1318. Assert.AreEqual(3, observer.Messages.Count);
  1319. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1320. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1321. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1322. }
  1323. [TestMethod]
  1324. public void Completed_to_late_subscriber_ReplayOne()
  1325. {
  1326. var s = new ReplaySubject<int>(1);
  1327. s.OnNext(1);
  1328. s.OnNext(2);
  1329. s.OnCompleted();
  1330. var scheduler = new TestScheduler();
  1331. var observer = scheduler.CreateObserver<int>();
  1332. s.Subscribe(observer);
  1333. Assert.AreEqual(2, observer.Messages.Count);
  1334. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1335. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[1].Value.Kind);
  1336. }
  1337. [TestMethod]
  1338. public void Completed_to_late_subscriber_ReplayMany()
  1339. {
  1340. var s = new ReplaySubject<int>(2);
  1341. s.OnNext(1);
  1342. s.OnNext(2);
  1343. s.OnNext(3);
  1344. s.OnCompleted();
  1345. var scheduler = new TestScheduler();
  1346. var observer = scheduler.CreateObserver<int>();
  1347. s.Subscribe(observer);
  1348. Assert.AreEqual(3, observer.Messages.Count);
  1349. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1350. Assert.AreEqual(3, observer.Messages[1].Value.Value);
  1351. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1352. }
  1353. [TestMethod]
  1354. public void Completed_to_late_subscriber_ReplayByTime()
  1355. {
  1356. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1357. s.OnNext(1);
  1358. s.OnNext(2);
  1359. s.OnNext(3);
  1360. s.OnCompleted();
  1361. var scheduler = new TestScheduler();
  1362. var observer = scheduler.CreateObserver<int>();
  1363. s.Subscribe(observer);
  1364. Assert.AreEqual(4, observer.Messages.Count);
  1365. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1366. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1367. Assert.AreEqual(3, observer.Messages[2].Value.Value);
  1368. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[3].Value.Kind);
  1369. }
  1370. //Potentially already covered by Error_* tests
  1371. [TestMethod]
  1372. public void Errored_to_late_subscriber_ReplayAll()
  1373. {
  1374. var expectedException = new Exception("Test");
  1375. var s = new ReplaySubject<int>();
  1376. s.OnNext(1);
  1377. s.OnNext(2);
  1378. s.OnError(expectedException);
  1379. var scheduler = new TestScheduler();
  1380. var observer = scheduler.CreateObserver<int>();
  1381. s.Subscribe(observer);
  1382. Assert.AreEqual(3, observer.Messages.Count);
  1383. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1384. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1385. Assert.AreEqual(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1386. Assert.AreEqual(expectedException, observer.Messages[2].Value.Exception);
  1387. }
  1388. [TestMethod]
  1389. public void Errored_to_late_subscriber_ReplayOne()
  1390. {
  1391. var expectedException = new Exception("Test");
  1392. var s = new ReplaySubject<int>(1);
  1393. s.OnNext(1);
  1394. s.OnNext(2);
  1395. s.OnError(expectedException);
  1396. var scheduler = new TestScheduler();
  1397. var observer = scheduler.CreateObserver<int>();
  1398. s.Subscribe(observer);
  1399. Assert.AreEqual(2, observer.Messages.Count);
  1400. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1401. Assert.AreEqual(NotificationKind.OnError, observer.Messages[1].Value.Kind);
  1402. Assert.AreEqual(expectedException, observer.Messages[1].Value.Exception);
  1403. }
  1404. [TestMethod]
  1405. public void Errored_to_late_subscriber_ReplayMany()
  1406. {
  1407. var expectedException = new Exception("Test");
  1408. var s = new ReplaySubject<int>(2);
  1409. s.OnNext(1);
  1410. s.OnNext(2);
  1411. s.OnNext(3);
  1412. s.OnError(expectedException);
  1413. var scheduler = new TestScheduler();
  1414. var observer = scheduler.CreateObserver<int>();
  1415. s.Subscribe(observer);
  1416. Assert.AreEqual(3, observer.Messages.Count);
  1417. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1418. Assert.AreEqual(3, observer.Messages[1].Value.Value);
  1419. Assert.AreEqual(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1420. Assert.AreEqual(expectedException, observer.Messages[2].Value.Exception);
  1421. }
  1422. [TestMethod]
  1423. public void Errored_to_late_subscriber_ReplayByTime()
  1424. {
  1425. var expectedException = new Exception("Test");
  1426. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1427. s.OnNext(1);
  1428. s.OnNext(2);
  1429. s.OnNext(3);
  1430. s.OnError(expectedException);
  1431. var scheduler = new TestScheduler();
  1432. var observer = scheduler.CreateObserver<int>();
  1433. s.Subscribe(observer);
  1434. Assert.AreEqual(4, observer.Messages.Count);
  1435. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1436. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1437. Assert.AreEqual(3, observer.Messages[2].Value.Value);
  1438. Assert.AreEqual(NotificationKind.OnError, observer.Messages[3].Value.Kind);
  1439. Assert.AreEqual(expectedException, observer.Messages[3].Value.Exception);
  1440. }
  1441. }
  1442. }