ReplaySubjectTest.cs 72 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Subjects;
  8. using System.Threading;
  9. using Microsoft.Reactive.Testing;
  10. using Microsoft.VisualStudio.TestTools.UnitTesting;
  11. using ReactiveTests.Dummies;
  12. #if !NO_TPL
  13. using System.Threading.Tasks;
  14. #endif
  15. namespace ReactiveTests.Tests
  16. {
  17. [TestClass]
  18. public partial class ReplaySubjectTest : ReactiveTest
  19. {
  20. [TestMethod]
  21. public void Subscribe_ArgumentChecking()
  22. {
  23. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().Subscribe(null));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).Subscribe(null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).Subscribe(null));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).Subscribe(null));
  27. }
  28. [TestMethod]
  29. public void OnError_ArgumentChecking()
  30. {
  31. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().OnError(null));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).OnError(null));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).OnError(null));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).OnError(null));
  35. }
  36. [TestMethod]
  37. public void Constructor_ArgumentChecking()
  38. {
  39. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1));
  40. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, DummyScheduler.Instance));
  41. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero));
  42. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero, DummyScheduler.Instance));
  43. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1)));
  44. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  45. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1)));
  46. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  47. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(null));
  48. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, null));
  49. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(TimeSpan.Zero, null));
  50. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, TimeSpan.Zero, null));
  51. // zero allowed
  52. new ReplaySubject<int>(0);
  53. new ReplaySubject<int>(TimeSpan.Zero);
  54. new ReplaySubject<int>(0, TimeSpan.Zero);
  55. new ReplaySubject<int>(0, DummyScheduler.Instance);
  56. new ReplaySubject<int>(TimeSpan.Zero, DummyScheduler.Instance);
  57. new ReplaySubject<int>(0, TimeSpan.Zero, DummyScheduler.Instance);
  58. }
  59. [TestMethod]
  60. public void Infinite_ReplayByTime()
  61. {
  62. var scheduler = new TestScheduler();
  63. var xs = scheduler.CreateHotObservable(
  64. OnNext(70, 1),
  65. OnNext(110, 2),
  66. OnNext(220, 3),
  67. OnNext(270, 4),
  68. OnNext(340, 5),
  69. OnNext(410, 6),
  70. OnNext(520, 7),
  71. OnNext(630, 8),
  72. OnNext(710, 9),
  73. OnNext(870, 10),
  74. OnNext(940, 11),
  75. OnNext(1020, 12)
  76. );
  77. var subject = default(ReplaySubject<int>);
  78. var subscription = default(IDisposable);
  79. var results1 = scheduler.CreateObserver<int>();
  80. var subscription1 = default(IDisposable);
  81. var results2 = scheduler.CreateObserver<int>();
  82. var subscription2 = default(IDisposable);
  83. var results3 = scheduler.CreateObserver<int>();
  84. var subscription3 = default(IDisposable);
  85. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  86. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  87. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  88. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  89. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  90. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  91. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  92. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  93. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  94. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  95. scheduler.Start();
  96. results1.Messages.AssertEqual(
  97. OnNext(301, 3),
  98. OnNext(302, 4),
  99. OnNext(341, 5),
  100. OnNext(411, 6),
  101. OnNext(521, 7)
  102. );
  103. results2.Messages.AssertEqual(
  104. OnNext(401, 5),
  105. OnNext(411, 6),
  106. OnNext(521, 7),
  107. OnNext(631, 8)
  108. );
  109. results3.Messages.AssertEqual(
  110. OnNext(901, 10),
  111. OnNext(941, 11)
  112. );
  113. }
  114. [TestMethod]
  115. public void Infinite_ReplayOne()
  116. {
  117. var scheduler = new TestScheduler();
  118. var xs = scheduler.CreateHotObservable(
  119. OnNext(70, 1),
  120. OnNext(110, 2),
  121. OnNext(220, 3),
  122. OnNext(270, 4),
  123. OnNext(340, 5),
  124. OnNext(410, 6),
  125. OnNext(520, 7),
  126. OnNext(630, 8),
  127. OnNext(710, 9),
  128. OnNext(870, 10),
  129. OnNext(940, 11),
  130. OnNext(1020, 12)
  131. );
  132. var subject = default(ReplaySubject<int>);
  133. var subscription = default(IDisposable);
  134. var results1 = scheduler.CreateObserver<int>();
  135. var subscription1 = default(IDisposable);
  136. var results2 = scheduler.CreateObserver<int>();
  137. var subscription2 = default(IDisposable);
  138. var results3 = scheduler.CreateObserver<int>();
  139. var subscription3 = default(IDisposable);
  140. var results4 = scheduler.CreateObserver<int>();
  141. var subscription4 = default(IDisposable);
  142. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  143. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  144. scheduler.ScheduleAbsolute(1200, () => subscription.Dispose());
  145. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  146. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  147. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  148. scheduler.ScheduleAbsolute(1100, () => subscription4 = subject.Subscribe(results4));
  149. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  150. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  151. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  152. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  153. scheduler.Start();
  154. results1.Messages.AssertEqual(
  155. OnNext(300, 4),
  156. OnNext(340, 5),
  157. OnNext(410, 6),
  158. OnNext(520, 7)
  159. );
  160. results2.Messages.AssertEqual(
  161. OnNext(400, 5),
  162. OnNext(410, 6),
  163. OnNext(520, 7),
  164. OnNext(630, 8)
  165. );
  166. results3.Messages.AssertEqual(
  167. OnNext(900, 10),
  168. OnNext(940, 11)
  169. );
  170. results4.Messages.AssertEqual(
  171. OnNext(1100, 12)
  172. );
  173. }
  174. [TestMethod]
  175. public void Infinite_ReplayMany()
  176. {
  177. var scheduler = new TestScheduler();
  178. var xs = scheduler.CreateHotObservable(
  179. OnNext(70, 1),
  180. OnNext(110, 2),
  181. OnNext(220, 3),
  182. OnNext(270, 4),
  183. OnNext(340, 5),
  184. OnNext(410, 6),
  185. OnNext(520, 7),
  186. OnNext(630, 8),
  187. OnNext(710, 9),
  188. OnNext(870, 10),
  189. OnNext(940, 11),
  190. OnNext(1020, 12)
  191. );
  192. var subject = default(ReplaySubject<int>);
  193. var subscription = default(IDisposable);
  194. var results1 = scheduler.CreateObserver<int>();
  195. var subscription1 = default(IDisposable);
  196. var results2 = scheduler.CreateObserver<int>();
  197. var subscription2 = default(IDisposable);
  198. var results3 = scheduler.CreateObserver<int>();
  199. var subscription3 = default(IDisposable);
  200. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  201. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  202. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  203. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  204. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  205. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  206. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  207. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  208. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  209. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  210. scheduler.Start();
  211. results1.Messages.AssertEqual(
  212. OnNext(300, 3),
  213. OnNext(300, 4),
  214. OnNext(340, 5),
  215. OnNext(410, 6),
  216. OnNext(520, 7)
  217. );
  218. results2.Messages.AssertEqual(
  219. OnNext(400, 3),
  220. OnNext(400, 4),
  221. OnNext(400, 5),
  222. OnNext(410, 6),
  223. OnNext(520, 7),
  224. OnNext(630, 8)
  225. );
  226. results3.Messages.AssertEqual(
  227. OnNext(900, 8),
  228. OnNext(900, 9),
  229. OnNext(900, 10),
  230. OnNext(940, 11)
  231. );
  232. }
  233. [TestMethod]
  234. public void Infinite_ReplayAll()
  235. {
  236. var scheduler = new TestScheduler();
  237. var xs = scheduler.CreateHotObservable(
  238. OnNext(70, 1),
  239. OnNext(110, 2),
  240. OnNext(220, 3),
  241. OnNext(270, 4),
  242. OnNext(340, 5),
  243. OnNext(410, 6),
  244. OnNext(520, 7),
  245. OnNext(630, 8),
  246. OnNext(710, 9),
  247. OnNext(870, 10),
  248. OnNext(940, 11),
  249. OnNext(1020, 12)
  250. );
  251. var subject = default(ReplaySubject<int>);
  252. var subscription = default(IDisposable);
  253. var results1 = scheduler.CreateObserver<int>();
  254. var subscription1 = default(IDisposable);
  255. var results2 = scheduler.CreateObserver<int>();
  256. var subscription2 = default(IDisposable);
  257. var results3 = scheduler.CreateObserver<int>();
  258. var subscription3 = default(IDisposable);
  259. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  260. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  261. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  262. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  263. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  264. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  265. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  266. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  267. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  268. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  269. scheduler.Start();
  270. results1.Messages.AssertEqual(
  271. OnNext(300, 3),
  272. OnNext(300, 4),
  273. OnNext(340, 5),
  274. OnNext(410, 6),
  275. OnNext(520, 7)
  276. );
  277. results2.Messages.AssertEqual(
  278. OnNext(400, 3),
  279. OnNext(400, 4),
  280. OnNext(400, 5),
  281. OnNext(410, 6),
  282. OnNext(520, 7),
  283. OnNext(630, 8)
  284. );
  285. results3.Messages.AssertEqual(
  286. OnNext(900, 3),
  287. OnNext(900, 4),
  288. OnNext(900, 5),
  289. OnNext(900, 6),
  290. OnNext(900, 7),
  291. OnNext(900, 8),
  292. OnNext(900, 9),
  293. OnNext(900, 10),
  294. OnNext(940, 11)
  295. );
  296. }
  297. [TestMethod]
  298. public void Infinite2()
  299. {
  300. var scheduler = new TestScheduler();
  301. var xs = scheduler.CreateHotObservable(
  302. OnNext(70, 1),
  303. OnNext(110, 2),
  304. OnNext(220, 3),
  305. OnNext(270, 4),
  306. OnNext(280, -1),
  307. OnNext(290, -2),
  308. OnNext(340, 5),
  309. OnNext(410, 6),
  310. OnNext(520, 7),
  311. OnNext(630, 8),
  312. OnNext(710, 9),
  313. OnNext(870, 10),
  314. OnNext(940, 11),
  315. OnNext(1020, 12)
  316. );
  317. var subject = default(ReplaySubject<int>);
  318. var subscription = default(IDisposable);
  319. var results1 = scheduler.CreateObserver<int>();
  320. var subscription1 = default(IDisposable);
  321. var results2 = scheduler.CreateObserver<int>();
  322. var subscription2 = default(IDisposable);
  323. var results3 = scheduler.CreateObserver<int>();
  324. var subscription3 = default(IDisposable);
  325. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  326. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  327. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  328. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  329. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  330. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  331. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  332. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  333. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  334. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  335. scheduler.Start();
  336. results1.Messages.AssertEqual(
  337. OnNext(301, 4),
  338. OnNext(302, -1),
  339. OnNext(303, -2),
  340. OnNext(341, 5),
  341. OnNext(411, 6),
  342. OnNext(521, 7)
  343. );
  344. results2.Messages.AssertEqual(
  345. OnNext(401, 5),
  346. OnNext(411, 6),
  347. OnNext(521, 7),
  348. OnNext(631, 8)
  349. );
  350. results3.Messages.AssertEqual(
  351. OnNext(901, 10),
  352. OnNext(941, 11)
  353. );
  354. }
  355. [TestMethod]
  356. public void Finite_ReplayByTime()
  357. {
  358. var scheduler = new TestScheduler();
  359. var xs = scheduler.CreateHotObservable(
  360. OnNext(70, 1),
  361. OnNext(110, 2),
  362. OnNext(220, 3),
  363. OnNext(270, 4),
  364. OnNext(340, 5),
  365. OnNext(410, 6),
  366. OnNext(520, 7),
  367. OnCompleted<int>(630),
  368. OnNext(640, 9),
  369. OnCompleted<int>(650),
  370. OnError<int>(660, new Exception())
  371. );
  372. var subject = default(ReplaySubject<int>);
  373. var subscription = default(IDisposable);
  374. var results1 = scheduler.CreateObserver<int>();
  375. var subscription1 = default(IDisposable);
  376. var results2 = scheduler.CreateObserver<int>();
  377. var subscription2 = default(IDisposable);
  378. var results3 = scheduler.CreateObserver<int>();
  379. var subscription3 = default(IDisposable);
  380. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  381. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  382. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  383. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  384. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  385. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  386. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  387. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  388. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  389. scheduler.Start();
  390. results1.Messages.AssertEqual(
  391. OnNext(301, 3),
  392. OnNext(302, 4),
  393. OnNext(341, 5),
  394. OnNext(411, 6),
  395. OnNext(521, 7)
  396. );
  397. results2.Messages.AssertEqual(
  398. OnNext(401, 5),
  399. OnNext(411, 6),
  400. OnNext(521, 7),
  401. OnCompleted<int>(631)
  402. );
  403. results3.Messages.AssertEqual(
  404. OnCompleted<int>(901)
  405. );
  406. }
  407. [TestMethod]
  408. public void Finite_ReplayOne()
  409. {
  410. var scheduler = new TestScheduler();
  411. var xs = scheduler.CreateHotObservable(
  412. OnNext(70, 1),
  413. OnNext(110, 2),
  414. OnNext(220, 3),
  415. OnNext(270, 4),
  416. OnNext(340, 5),
  417. OnNext(410, 6),
  418. OnNext(520, 7),
  419. OnCompleted<int>(630),
  420. OnNext(640, 9),
  421. OnCompleted<int>(650),
  422. OnError<int>(660, new Exception())
  423. );
  424. var subject = default(ReplaySubject<int>);
  425. var subscription = default(IDisposable);
  426. var results1 = scheduler.CreateObserver<int>();
  427. var subscription1 = default(IDisposable);
  428. var results2 = scheduler.CreateObserver<int>();
  429. var subscription2 = default(IDisposable);
  430. var results3 = scheduler.CreateObserver<int>();
  431. var subscription3 = default(IDisposable);
  432. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  433. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  434. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  435. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  436. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  437. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  438. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  439. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  440. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  441. scheduler.Start();
  442. results1.Messages.AssertEqual(
  443. OnNext(300, 4),
  444. OnNext(340, 5),
  445. OnNext(410, 6),
  446. OnNext(520, 7)
  447. );
  448. results2.Messages.AssertEqual(
  449. OnNext(400, 5),
  450. OnNext(410, 6),
  451. OnNext(520, 7),
  452. OnCompleted<int>(630)
  453. );
  454. results3.Messages.AssertEqual(
  455. OnNext(900, 7),
  456. OnCompleted<int>(900)
  457. );
  458. }
  459. [TestMethod]
  460. public void Finite_ReplayMany()
  461. {
  462. var scheduler = new TestScheduler();
  463. var xs = scheduler.CreateHotObservable(
  464. OnNext(70, 1),
  465. OnNext(110, 2),
  466. OnNext(220, 3),
  467. OnNext(270, 4),
  468. OnNext(340, 5),
  469. OnNext(410, 6),
  470. OnNext(520, 7),
  471. OnCompleted<int>(630),
  472. OnNext(640, 9),
  473. OnCompleted<int>(650),
  474. OnError<int>(660, new Exception())
  475. );
  476. var subject = default(ReplaySubject<int>);
  477. var subscription = default(IDisposable);
  478. var results1 = scheduler.CreateObserver<int>();
  479. var subscription1 = default(IDisposable);
  480. var results2 = scheduler.CreateObserver<int>();
  481. var subscription2 = default(IDisposable);
  482. var results3 = scheduler.CreateObserver<int>();
  483. var subscription3 = default(IDisposable);
  484. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  485. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  486. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  487. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  488. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  489. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  490. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  491. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  492. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  493. scheduler.Start();
  494. results1.Messages.AssertEqual(
  495. OnNext(300, 3),
  496. OnNext(300, 4),
  497. OnNext(340, 5),
  498. OnNext(410, 6),
  499. OnNext(520, 7)
  500. );
  501. results2.Messages.AssertEqual(
  502. OnNext(400, 3),
  503. OnNext(400, 4),
  504. OnNext(400, 5),
  505. OnNext(410, 6),
  506. OnNext(520, 7),
  507. OnCompleted<int>(630)
  508. );
  509. results3.Messages.AssertEqual(
  510. OnNext(900, 5),
  511. OnNext(900, 6),
  512. OnNext(900, 7),
  513. OnCompleted<int>(900)
  514. );
  515. }
  516. [TestMethod]
  517. public void Finite_ReplayAll()
  518. {
  519. var scheduler = new TestScheduler();
  520. var xs = scheduler.CreateHotObservable(
  521. OnNext(70, 1),
  522. OnNext(110, 2),
  523. OnNext(220, 3),
  524. OnNext(270, 4),
  525. OnNext(340, 5),
  526. OnNext(410, 6),
  527. OnNext(520, 7),
  528. OnCompleted<int>(630),
  529. OnNext(640, 9),
  530. OnCompleted<int>(650),
  531. OnError<int>(660, new Exception())
  532. );
  533. var subject = default(ReplaySubject<int>);
  534. var subscription = default(IDisposable);
  535. var results1 = scheduler.CreateObserver<int>();
  536. var subscription1 = default(IDisposable);
  537. var results2 = scheduler.CreateObserver<int>();
  538. var subscription2 = default(IDisposable);
  539. var results3 = scheduler.CreateObserver<int>();
  540. var subscription3 = default(IDisposable);
  541. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  542. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  543. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  544. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  545. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  546. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  547. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  548. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  549. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  550. scheduler.Start();
  551. results1.Messages.AssertEqual(
  552. OnNext(300, 3),
  553. OnNext(300, 4),
  554. OnNext(340, 5),
  555. OnNext(410, 6),
  556. OnNext(520, 7)
  557. );
  558. results2.Messages.AssertEqual(
  559. OnNext(400, 3),
  560. OnNext(400, 4),
  561. OnNext(400, 5),
  562. OnNext(410, 6),
  563. OnNext(520, 7),
  564. OnCompleted<int>(630)
  565. );
  566. results3.Messages.AssertEqual(
  567. OnNext(900, 3),
  568. OnNext(900, 4),
  569. OnNext(900, 5),
  570. OnNext(900, 6),
  571. OnNext(900, 7),
  572. OnCompleted<int>(900)
  573. );
  574. }
  575. [TestMethod]
  576. public void Error_ReplayByTime()
  577. {
  578. var scheduler = new TestScheduler();
  579. var ex = new Exception();
  580. var xs = scheduler.CreateHotObservable(
  581. OnNext(70, 1),
  582. OnNext(110, 2),
  583. OnNext(220, 3),
  584. OnNext(270, 4),
  585. OnNext(340, 5),
  586. OnNext(410, 6),
  587. OnNext(520, 7),
  588. OnError<int>(630, ex),
  589. OnNext(640, 9),
  590. OnCompleted<int>(650),
  591. OnError<int>(660, new Exception())
  592. );
  593. var subject = default(ReplaySubject<int>);
  594. var subscription = default(IDisposable);
  595. var results1 = scheduler.CreateObserver<int>();
  596. var subscription1 = default(IDisposable);
  597. var results2 = scheduler.CreateObserver<int>();
  598. var subscription2 = default(IDisposable);
  599. var results3 = scheduler.CreateObserver<int>();
  600. var subscription3 = default(IDisposable);
  601. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  602. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  603. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  604. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  605. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  606. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  607. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  608. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  609. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  610. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  611. scheduler.Start();
  612. results1.Messages.AssertEqual(
  613. OnNext(301, 3),
  614. OnNext(302, 4),
  615. OnNext(341, 5),
  616. OnNext(411, 6),
  617. OnNext(521, 7)
  618. );
  619. results2.Messages.AssertEqual(
  620. OnNext(401, 5),
  621. OnNext(411, 6),
  622. OnNext(521, 7),
  623. OnError<int>(631, ex)
  624. );
  625. results3.Messages.AssertEqual(
  626. OnError<int>(901, ex)
  627. );
  628. }
  629. [TestMethod]
  630. public void Error_ReplayOne()
  631. {
  632. var scheduler = new TestScheduler();
  633. var ex = new Exception();
  634. var xs = scheduler.CreateHotObservable(
  635. OnNext(70, 1),
  636. OnNext(110, 2),
  637. OnNext(220, 3),
  638. OnNext(270, 4),
  639. OnNext(340, 5),
  640. OnNext(410, 6),
  641. OnNext(520, 7),
  642. OnError<int>(630, ex),
  643. OnNext(640, 9),
  644. OnCompleted<int>(650),
  645. OnError<int>(660, new Exception())
  646. );
  647. var subject = default(ReplaySubject<int>);
  648. var subscription = default(IDisposable);
  649. var results1 = scheduler.CreateObserver<int>();
  650. var subscription1 = default(IDisposable);
  651. var results2 = scheduler.CreateObserver<int>();
  652. var subscription2 = default(IDisposable);
  653. var results3 = scheduler.CreateObserver<int>();
  654. var subscription3 = default(IDisposable);
  655. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  656. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  657. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  658. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  659. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  660. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  661. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  662. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  663. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  664. scheduler.Start();
  665. results1.Messages.AssertEqual(
  666. OnNext(300, 4),
  667. OnNext(340, 5),
  668. OnNext(410, 6),
  669. OnNext(520, 7)
  670. );
  671. results2.Messages.AssertEqual(
  672. OnNext(400, 5),
  673. OnNext(410, 6),
  674. OnNext(520, 7),
  675. OnError<int>(630, ex)
  676. );
  677. results3.Messages.AssertEqual(
  678. OnNext(900, 7),
  679. OnError<int>(900, ex)
  680. );
  681. }
  682. [TestMethod]
  683. public void Error_ReplayMany()
  684. {
  685. var scheduler = new TestScheduler();
  686. var ex = new Exception();
  687. var xs = scheduler.CreateHotObservable(
  688. OnNext(70, 1),
  689. OnNext(110, 2),
  690. OnNext(220, 3),
  691. OnNext(270, 4),
  692. OnNext(340, 5),
  693. OnNext(410, 6),
  694. OnNext(520, 7),
  695. OnError<int>(630, ex),
  696. OnNext(640, 9),
  697. OnCompleted<int>(650),
  698. OnError<int>(660, new Exception())
  699. );
  700. var subject = default(ReplaySubject<int>);
  701. var subscription = default(IDisposable);
  702. var results1 = scheduler.CreateObserver<int>();
  703. var subscription1 = default(IDisposable);
  704. var results2 = scheduler.CreateObserver<int>();
  705. var subscription2 = default(IDisposable);
  706. var results3 = scheduler.CreateObserver<int>();
  707. var subscription3 = default(IDisposable);
  708. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  709. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  710. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  711. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  712. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  713. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  714. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  715. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  716. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  717. scheduler.Start();
  718. results1.Messages.AssertEqual(
  719. OnNext(300, 3),
  720. OnNext(300, 4),
  721. OnNext(340, 5),
  722. OnNext(410, 6),
  723. OnNext(520, 7)
  724. );
  725. results2.Messages.AssertEqual(
  726. OnNext(400, 3),
  727. OnNext(400, 4),
  728. OnNext(400, 5),
  729. OnNext(410, 6),
  730. OnNext(520, 7),
  731. OnError<int>(630, ex)
  732. );
  733. results3.Messages.AssertEqual(
  734. OnNext(900, 5),
  735. OnNext(900, 6),
  736. OnNext(900, 7),
  737. OnError<int>(900, ex)
  738. );
  739. }
  740. [TestMethod]
  741. public void Error_ReplayAll()
  742. {
  743. var scheduler = new TestScheduler();
  744. var ex = new Exception();
  745. var xs = scheduler.CreateHotObservable(
  746. OnNext(70, 1),
  747. OnNext(110, 2),
  748. OnNext(220, 3),
  749. OnNext(270, 4),
  750. OnNext(340, 5),
  751. OnNext(410, 6),
  752. OnNext(520, 7),
  753. OnError<int>(630, ex),
  754. OnNext(640, 9),
  755. OnCompleted<int>(650),
  756. OnError<int>(660, new Exception())
  757. );
  758. var subject = default(ReplaySubject<int>);
  759. var subscription = default(IDisposable);
  760. var results1 = scheduler.CreateObserver<int>();
  761. var subscription1 = default(IDisposable);
  762. var results2 = scheduler.CreateObserver<int>();
  763. var subscription2 = default(IDisposable);
  764. var results3 = scheduler.CreateObserver<int>();
  765. var subscription3 = default(IDisposable);
  766. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  767. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  768. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  769. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  770. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  771. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  772. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  773. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  774. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  775. scheduler.Start();
  776. results1.Messages.AssertEqual(
  777. OnNext(300, 3),
  778. OnNext(300, 4),
  779. OnNext(340, 5),
  780. OnNext(410, 6),
  781. OnNext(520, 7)
  782. );
  783. results2.Messages.AssertEqual(
  784. OnNext(400, 3),
  785. OnNext(400, 4),
  786. OnNext(400, 5),
  787. OnNext(410, 6),
  788. OnNext(520, 7),
  789. OnError<int>(630, ex)
  790. );
  791. results3.Messages.AssertEqual(
  792. OnNext(900, 3),
  793. OnNext(900, 4),
  794. OnNext(900, 5),
  795. OnNext(900, 6),
  796. OnNext(900, 7),
  797. OnError<int>(900, ex)
  798. );
  799. }
  800. [TestMethod]
  801. public void Canceled_ReplayByTime()
  802. {
  803. var scheduler = new TestScheduler();
  804. var xs = scheduler.CreateHotObservable(
  805. OnCompleted<int>(630),
  806. OnNext(640, 9),
  807. OnCompleted<int>(650),
  808. OnError<int>(660, new Exception())
  809. );
  810. var subject = default(ReplaySubject<int>);
  811. var subscription = default(IDisposable);
  812. var results1 = scheduler.CreateObserver<int>();
  813. var subscription1 = default(IDisposable);
  814. var results2 = scheduler.CreateObserver<int>();
  815. var subscription2 = default(IDisposable);
  816. var results3 = scheduler.CreateObserver<int>();
  817. var subscription3 = default(IDisposable);
  818. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  819. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  820. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  821. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  822. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  823. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  824. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  825. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  826. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  827. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  828. scheduler.Start();
  829. results1.Messages.AssertEqual(
  830. );
  831. results2.Messages.AssertEqual(
  832. OnCompleted<int>(631)
  833. );
  834. results3.Messages.AssertEqual(
  835. OnCompleted<int>(901)
  836. );
  837. }
  838. [TestMethod]
  839. public void Canceled_ReplayOne()
  840. {
  841. var scheduler = new TestScheduler();
  842. var xs = scheduler.CreateHotObservable(
  843. OnCompleted<int>(630),
  844. OnNext(640, 9),
  845. OnCompleted<int>(650),
  846. OnError<int>(660, new Exception())
  847. );
  848. var subject = default(ReplaySubject<int>);
  849. var subscription = default(IDisposable);
  850. var results1 = scheduler.CreateObserver<int>();
  851. var subscription1 = default(IDisposable);
  852. var results2 = scheduler.CreateObserver<int>();
  853. var subscription2 = default(IDisposable);
  854. var results3 = scheduler.CreateObserver<int>();
  855. var subscription3 = default(IDisposable);
  856. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  857. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  858. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  859. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  860. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  861. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  862. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  863. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  864. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  865. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  866. scheduler.Start();
  867. results1.Messages.AssertEqual(
  868. );
  869. results2.Messages.AssertEqual(
  870. OnCompleted<int>(630)
  871. );
  872. results3.Messages.AssertEqual(
  873. OnCompleted<int>(900)
  874. );
  875. }
  876. [TestMethod]
  877. public void Canceled_ReplayMany()
  878. {
  879. var scheduler = new TestScheduler();
  880. var xs = scheduler.CreateHotObservable(
  881. OnCompleted<int>(630),
  882. OnNext(640, 9),
  883. OnCompleted<int>(650),
  884. OnError<int>(660, new Exception())
  885. );
  886. var subject = default(ReplaySubject<int>);
  887. var subscription = default(IDisposable);
  888. var results1 = scheduler.CreateObserver<int>();
  889. var subscription1 = default(IDisposable);
  890. var results2 = scheduler.CreateObserver<int>();
  891. var subscription2 = default(IDisposable);
  892. var results3 = scheduler.CreateObserver<int>();
  893. var subscription3 = default(IDisposable);
  894. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  895. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  896. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  897. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  898. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  899. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  900. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  901. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  902. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  903. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  904. scheduler.Start();
  905. results1.Messages.AssertEqual(
  906. );
  907. results2.Messages.AssertEqual(
  908. OnCompleted<int>(630)
  909. );
  910. results3.Messages.AssertEqual(
  911. OnCompleted<int>(900)
  912. );
  913. }
  914. [TestMethod]
  915. public void Canceled_ReplayAll()
  916. {
  917. var scheduler = new TestScheduler();
  918. var xs = scheduler.CreateHotObservable(
  919. OnCompleted<int>(630),
  920. OnNext(640, 9),
  921. OnCompleted<int>(650),
  922. OnError<int>(660, new Exception())
  923. );
  924. var subject = default(ReplaySubject<int>);
  925. var subscription = default(IDisposable);
  926. var results1 = scheduler.CreateObserver<int>();
  927. var subscription1 = default(IDisposable);
  928. var results2 = scheduler.CreateObserver<int>();
  929. var subscription2 = default(IDisposable);
  930. var results3 = scheduler.CreateObserver<int>();
  931. var subscription3 = default(IDisposable);
  932. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  933. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  934. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  935. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  936. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  937. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  938. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  939. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  940. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  941. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  942. scheduler.Start();
  943. results1.Messages.AssertEqual(
  944. );
  945. results2.Messages.AssertEqual(
  946. OnCompleted<int>(630)
  947. );
  948. results3.Messages.AssertEqual(
  949. OnCompleted<int>(900)
  950. );
  951. }
  952. [TestMethod]
  953. public void SubjectDisposed()
  954. {
  955. var scheduler = new TestScheduler();
  956. var subject = default(ReplaySubject<int>);
  957. var results1 = scheduler.CreateObserver<int>();
  958. var subscription1 = default(IDisposable);
  959. var results2 = scheduler.CreateObserver<int>();
  960. var subscription2 = default(IDisposable);
  961. var results3 = scheduler.CreateObserver<int>();
  962. var subscription3 = default(IDisposable);
  963. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(scheduler));
  964. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  965. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  966. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  967. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  968. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  969. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  970. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  971. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  972. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  973. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  974. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  975. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  976. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  977. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  978. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  979. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  980. scheduler.Start();
  981. results1.Messages.AssertEqual(
  982. OnNext(201, 1),
  983. OnNext(251, 2),
  984. OnNext(351, 3),
  985. OnNext(451, 4)
  986. );
  987. results2.Messages.AssertEqual(
  988. OnNext(301, 1),
  989. OnNext(302, 2),
  990. OnNext(351, 3),
  991. OnNext(451, 4),
  992. OnNext(551, 5)
  993. );
  994. results3.Messages.AssertEqual(
  995. OnNext(401, 1),
  996. OnNext(402, 2),
  997. OnNext(403, 3),
  998. OnNext(451, 4),
  999. OnNext(551, 5)
  1000. );
  1001. }
  1002. [TestMethod]
  1003. public void SubjectDisposed_ReplayOne()
  1004. {
  1005. var scheduler = new TestScheduler();
  1006. var subject = default(ReplaySubject<int>);
  1007. var results1 = scheduler.CreateObserver<int>();
  1008. var subscription1 = default(IDisposable);
  1009. var results2 = scheduler.CreateObserver<int>();
  1010. var subscription2 = default(IDisposable);
  1011. var results3 = scheduler.CreateObserver<int>();
  1012. var subscription3 = default(IDisposable);
  1013. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  1014. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1015. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1016. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1017. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1018. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1019. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1020. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1021. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1022. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1023. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1024. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1025. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1026. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1027. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1028. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1029. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1030. scheduler.Start();
  1031. results1.Messages.AssertEqual(
  1032. OnNext(200, 1),
  1033. OnNext(250, 2),
  1034. OnNext(350, 3),
  1035. OnNext(450, 4)
  1036. );
  1037. results2.Messages.AssertEqual(
  1038. OnNext(300, 2),
  1039. OnNext(350, 3),
  1040. OnNext(450, 4),
  1041. OnNext(550, 5)
  1042. );
  1043. results3.Messages.AssertEqual(
  1044. OnNext(400, 3),
  1045. OnNext(450, 4),
  1046. OnNext(550, 5)
  1047. );
  1048. }
  1049. [TestMethod]
  1050. public void SubjectDisposed_ReplayMany()
  1051. {
  1052. var scheduler = new TestScheduler();
  1053. var subject = default(ReplaySubject<int>);
  1054. var results1 = scheduler.CreateObserver<int>();
  1055. var subscription1 = default(IDisposable);
  1056. var results2 = scheduler.CreateObserver<int>();
  1057. var subscription2 = default(IDisposable);
  1058. var results3 = scheduler.CreateObserver<int>();
  1059. var subscription3 = default(IDisposable);
  1060. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  1061. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1062. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1063. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1064. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1065. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1066. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1067. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1068. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1069. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1070. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1071. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1072. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1073. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1074. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1075. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1076. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1077. scheduler.Start();
  1078. results1.Messages.AssertEqual(
  1079. OnNext(200, 1),
  1080. OnNext(250, 2),
  1081. OnNext(350, 3),
  1082. OnNext(450, 4)
  1083. );
  1084. results2.Messages.AssertEqual(
  1085. OnNext(300, 1),
  1086. OnNext(300, 2),
  1087. OnNext(350, 3),
  1088. OnNext(450, 4),
  1089. OnNext(550, 5)
  1090. );
  1091. results3.Messages.AssertEqual(
  1092. OnNext(400, 1),
  1093. OnNext(400, 2),
  1094. OnNext(400, 3),
  1095. OnNext(450, 4),
  1096. OnNext(550, 5)
  1097. );
  1098. }
  1099. [TestMethod]
  1100. public void SubjectDisposed_ReplayAll()
  1101. {
  1102. var scheduler = new TestScheduler();
  1103. var subject = default(ReplaySubject<int>);
  1104. var results1 = scheduler.CreateObserver<int>();
  1105. var subscription1 = default(IDisposable);
  1106. var results2 = scheduler.CreateObserver<int>();
  1107. var subscription2 = default(IDisposable);
  1108. var results3 = scheduler.CreateObserver<int>();
  1109. var subscription3 = default(IDisposable);
  1110. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  1111. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1112. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1113. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1114. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1115. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1116. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1117. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1118. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1119. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1120. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1121. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1122. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1123. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1124. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1125. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1126. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1127. scheduler.Start();
  1128. results1.Messages.AssertEqual(
  1129. OnNext(200, 1),
  1130. OnNext(250, 2),
  1131. OnNext(350, 3),
  1132. OnNext(450, 4)
  1133. );
  1134. results2.Messages.AssertEqual(
  1135. OnNext(300, 1),
  1136. OnNext(300, 2),
  1137. OnNext(350, 3),
  1138. OnNext(450, 4),
  1139. OnNext(550, 5)
  1140. );
  1141. results3.Messages.AssertEqual(
  1142. OnNext(400, 1),
  1143. OnNext(400, 2),
  1144. OnNext(400, 3),
  1145. OnNext(450, 4),
  1146. OnNext(550, 5)
  1147. );
  1148. }
  1149. //
  1150. // TODO: Create a failing test for this for the other implementations (ReplayOne/Many/All).
  1151. // I don't understand the behavior.
  1152. // I think it may have to do with calling Trim() on Subscription (as well as in the OnNext calls). -LC
  1153. //
  1154. [TestMethod]
  1155. public void ReplaySubjectDiesOut()
  1156. {
  1157. //
  1158. // Tests v1.x behavior as documented in ReplaySubject.cs (Subscribe method).
  1159. //
  1160. var scheduler = new TestScheduler();
  1161. var xs = scheduler.CreateHotObservable(
  1162. OnNext(70, 1),
  1163. OnNext(110, 2),
  1164. OnNext(220, 3),
  1165. OnNext(270, 4),
  1166. OnNext(340, 5),
  1167. OnNext(410, 6),
  1168. OnNext(520, 7),
  1169. OnCompleted<int>(580)
  1170. );
  1171. var subject = default(ReplaySubject<int>);
  1172. var results1 = scheduler.CreateObserver<int>();
  1173. var results2 = scheduler.CreateObserver<int>();
  1174. var results3 = scheduler.CreateObserver<int>();
  1175. var results4 = scheduler.CreateObserver<int>();
  1176. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(int.MaxValue, TimeSpan.FromTicks(100), scheduler));
  1177. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(subject));
  1178. scheduler.ScheduleAbsolute(300, () => subject.Subscribe(results1));
  1179. scheduler.ScheduleAbsolute(400, () => subject.Subscribe(results2));
  1180. scheduler.ScheduleAbsolute(600, () => subject.Subscribe(results3));
  1181. scheduler.ScheduleAbsolute(900, () => subject.Subscribe(results4));
  1182. scheduler.Start();
  1183. results1.Messages.AssertEqual(
  1184. OnNext(301, 3),
  1185. OnNext(302, 4),
  1186. OnNext(341, 5),
  1187. OnNext(411, 6),
  1188. OnNext(521, 7),
  1189. OnCompleted<int>(581)
  1190. );
  1191. results2.Messages.AssertEqual(
  1192. OnNext(401, 5),
  1193. OnNext(411, 6),
  1194. OnNext(521, 7),
  1195. OnCompleted<int>(581)
  1196. );
  1197. results3.Messages.AssertEqual(
  1198. OnNext(601, 7),
  1199. OnCompleted<int>(602)
  1200. );
  1201. results4.Messages.AssertEqual(
  1202. OnCompleted<int>(901)
  1203. );
  1204. }
  1205. [TestMethod]
  1206. public void HasObservers()
  1207. {
  1208. HasObservers(new ReplaySubject<int>());
  1209. HasObservers(new ReplaySubject<int>(1));
  1210. HasObservers(new ReplaySubject<int>(3));
  1211. HasObservers(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1212. }
  1213. private static void HasObservers(ReplaySubject<int> s)
  1214. {
  1215. Assert.IsFalse(s.HasObservers);
  1216. var d1 = s.Subscribe(_ => { });
  1217. Assert.IsTrue(s.HasObservers);
  1218. d1.Dispose();
  1219. Assert.IsFalse(s.HasObservers);
  1220. var d2 = s.Subscribe(_ => { });
  1221. Assert.IsTrue(s.HasObservers);
  1222. var d3 = s.Subscribe(_ => { });
  1223. Assert.IsTrue(s.HasObservers);
  1224. d2.Dispose();
  1225. Assert.IsTrue(s.HasObservers);
  1226. d3.Dispose();
  1227. Assert.IsFalse(s.HasObservers);
  1228. }
  1229. [TestMethod]
  1230. public void HasObservers_Dispose1()
  1231. {
  1232. HasObservers_Dispose1(new ReplaySubject<int>());
  1233. HasObservers_Dispose1(new ReplaySubject<int>(1));
  1234. HasObservers_Dispose1(new ReplaySubject<int>(3));
  1235. HasObservers_Dispose1(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1236. }
  1237. private static void HasObservers_Dispose1(ReplaySubject<int> s)
  1238. {
  1239. Assert.IsFalse(s.HasObservers);
  1240. Assert.IsFalse(s.IsDisposed);
  1241. var d = s.Subscribe(_ => { });
  1242. Assert.IsTrue(s.HasObservers);
  1243. Assert.IsFalse(s.IsDisposed);
  1244. s.Dispose();
  1245. Assert.IsFalse(s.HasObservers);
  1246. Assert.IsTrue(s.IsDisposed);
  1247. d.Dispose();
  1248. Assert.IsFalse(s.HasObservers);
  1249. Assert.IsTrue(s.IsDisposed);
  1250. }
  1251. [TestMethod]
  1252. public void HasObservers_Dispose2()
  1253. {
  1254. HasObservers_Dispose2(new ReplaySubject<int>());
  1255. HasObservers_Dispose2(new ReplaySubject<int>(1));
  1256. HasObservers_Dispose2(new ReplaySubject<int>(3));
  1257. HasObservers_Dispose2(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1258. }
  1259. private static void HasObservers_Dispose2(ReplaySubject<int> s)
  1260. {
  1261. Assert.IsFalse(s.HasObservers);
  1262. Assert.IsFalse(s.IsDisposed);
  1263. var d = s.Subscribe(_ => { });
  1264. Assert.IsTrue(s.HasObservers);
  1265. Assert.IsFalse(s.IsDisposed);
  1266. d.Dispose();
  1267. Assert.IsFalse(s.HasObservers);
  1268. Assert.IsFalse(s.IsDisposed);
  1269. s.Dispose();
  1270. Assert.IsFalse(s.HasObservers);
  1271. Assert.IsTrue(s.IsDisposed);
  1272. }
  1273. [TestMethod]
  1274. public void HasObservers_Dispose3()
  1275. {
  1276. HasObservers_Dispose3(new ReplaySubject<int>());
  1277. HasObservers_Dispose3(new ReplaySubject<int>(1));
  1278. HasObservers_Dispose3(new ReplaySubject<int>(3));
  1279. HasObservers_Dispose3(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1280. }
  1281. private static void HasObservers_Dispose3(ReplaySubject<int> s)
  1282. {
  1283. Assert.IsFalse(s.HasObservers);
  1284. Assert.IsFalse(s.IsDisposed);
  1285. s.Dispose();
  1286. Assert.IsFalse(s.HasObservers);
  1287. Assert.IsTrue(s.IsDisposed);
  1288. }
  1289. [TestMethod]
  1290. public void HasObservers_OnCompleted()
  1291. {
  1292. HasObservers_OnCompleted(new ReplaySubject<int>());
  1293. HasObservers_OnCompleted(new ReplaySubject<int>(1));
  1294. HasObservers_OnCompleted(new ReplaySubject<int>(3));
  1295. HasObservers_OnCompleted(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1296. }
  1297. private static void HasObservers_OnCompleted(ReplaySubject<int> s)
  1298. {
  1299. Assert.IsFalse(s.HasObservers);
  1300. var d = s.Subscribe(_ => { });
  1301. Assert.IsTrue(s.HasObservers);
  1302. s.OnNext(42);
  1303. Assert.IsTrue(s.HasObservers);
  1304. s.OnCompleted();
  1305. Assert.IsFalse(s.HasObservers);
  1306. }
  1307. [TestMethod]
  1308. public void HasObservers_OnError()
  1309. {
  1310. HasObservers_OnError(new ReplaySubject<int>());
  1311. HasObservers_OnError(new ReplaySubject<int>(1));
  1312. HasObservers_OnError(new ReplaySubject<int>(3));
  1313. HasObservers_OnError(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1314. }
  1315. private static void HasObservers_OnError(ReplaySubject<int> s)
  1316. {
  1317. Assert.IsFalse(s.HasObservers);
  1318. var d = s.Subscribe(_ => { }, ex => { });
  1319. Assert.IsTrue(s.HasObservers);
  1320. s.OnNext(42);
  1321. Assert.IsTrue(s.HasObservers);
  1322. s.OnError(new Exception());
  1323. Assert.IsFalse(s.HasObservers);
  1324. }
  1325. [TestMethod]
  1326. public void Completed_to_late_subscriber_ReplayAll()
  1327. {
  1328. var s = new ReplaySubject<int>();
  1329. s.OnNext(1);
  1330. s.OnNext(2);
  1331. s.OnCompleted();
  1332. var scheduler = new TestScheduler();
  1333. var observer = scheduler.CreateObserver<int>();
  1334. s.Subscribe(observer);
  1335. Assert.AreEqual(3, observer.Messages.Count);
  1336. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1337. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1338. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1339. }
  1340. [TestMethod]
  1341. public void Completed_to_late_subscriber_ReplayOne()
  1342. {
  1343. var s = new ReplaySubject<int>(1);
  1344. s.OnNext(1);
  1345. s.OnNext(2);
  1346. s.OnCompleted();
  1347. var scheduler = new TestScheduler();
  1348. var observer = scheduler.CreateObserver<int>();
  1349. s.Subscribe(observer);
  1350. Assert.AreEqual(2, observer.Messages.Count);
  1351. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1352. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[1].Value.Kind);
  1353. }
  1354. [TestMethod]
  1355. public void Completed_to_late_subscriber_ReplayMany()
  1356. {
  1357. var s = new ReplaySubject<int>(2);
  1358. s.OnNext(1);
  1359. s.OnNext(2);
  1360. s.OnNext(3);
  1361. s.OnCompleted();
  1362. var scheduler = new TestScheduler();
  1363. var observer = scheduler.CreateObserver<int>();
  1364. s.Subscribe(observer);
  1365. Assert.AreEqual(3, observer.Messages.Count);
  1366. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1367. Assert.AreEqual(3, observer.Messages[1].Value.Value);
  1368. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1369. }
  1370. [TestMethod]
  1371. public void Completed_to_late_subscriber_ReplayByTime()
  1372. {
  1373. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1374. s.OnNext(1);
  1375. s.OnNext(2);
  1376. s.OnNext(3);
  1377. s.OnCompleted();
  1378. var scheduler = new TestScheduler();
  1379. var observer = scheduler.CreateObserver<int>();
  1380. s.Subscribe(observer);
  1381. Assert.AreEqual(4, observer.Messages.Count);
  1382. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1383. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1384. Assert.AreEqual(3, observer.Messages[2].Value.Value);
  1385. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[3].Value.Kind);
  1386. }
  1387. [TestMethod]
  1388. public void Errored_to_late_subscriber_ReplayAll()
  1389. {
  1390. var expectedException = new Exception("Test");
  1391. var s = new ReplaySubject<int>();
  1392. s.OnNext(1);
  1393. s.OnNext(2);
  1394. s.OnError(expectedException);
  1395. var scheduler = new TestScheduler();
  1396. var observer = scheduler.CreateObserver<int>();
  1397. s.Subscribe(observer);
  1398. Assert.AreEqual(3, observer.Messages.Count);
  1399. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1400. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1401. Assert.AreEqual(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1402. Assert.AreEqual(expectedException, observer.Messages[2].Value.Exception);
  1403. }
  1404. [TestMethod]
  1405. public void Errored_to_late_subscriber_ReplayOne()
  1406. {
  1407. var expectedException = new Exception("Test");
  1408. var s = new ReplaySubject<int>(1);
  1409. s.OnNext(1);
  1410. s.OnNext(2);
  1411. s.OnError(expectedException);
  1412. var scheduler = new TestScheduler();
  1413. var observer = scheduler.CreateObserver<int>();
  1414. s.Subscribe(observer);
  1415. Assert.AreEqual(2, observer.Messages.Count);
  1416. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1417. Assert.AreEqual(NotificationKind.OnError, observer.Messages[1].Value.Kind);
  1418. Assert.AreEqual(expectedException, observer.Messages[1].Value.Exception);
  1419. }
  1420. [TestMethod]
  1421. public void Errored_to_late_subscriber_ReplayMany()
  1422. {
  1423. var expectedException = new Exception("Test");
  1424. var s = new ReplaySubject<int>(2);
  1425. s.OnNext(1);
  1426. s.OnNext(2);
  1427. s.OnNext(3);
  1428. s.OnError(expectedException);
  1429. var scheduler = new TestScheduler();
  1430. var observer = scheduler.CreateObserver<int>();
  1431. s.Subscribe(observer);
  1432. Assert.AreEqual(3, observer.Messages.Count);
  1433. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1434. Assert.AreEqual(3, observer.Messages[1].Value.Value);
  1435. Assert.AreEqual(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1436. Assert.AreEqual(expectedException, observer.Messages[2].Value.Exception);
  1437. }
  1438. [TestMethod]
  1439. public void Errored_to_late_subscriber_ReplayByTime()
  1440. {
  1441. var expectedException = new Exception("Test");
  1442. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1443. s.OnNext(1);
  1444. s.OnNext(2);
  1445. s.OnNext(3);
  1446. s.OnError(expectedException);
  1447. var scheduler = new TestScheduler();
  1448. var observer = scheduler.CreateObserver<int>();
  1449. s.Subscribe(observer);
  1450. Assert.AreEqual(4, observer.Messages.Count);
  1451. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1452. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1453. Assert.AreEqual(3, observer.Messages[2].Value.Value);
  1454. Assert.AreEqual(NotificationKind.OnError, observer.Messages[3].Value.Kind);
  1455. Assert.AreEqual(expectedException, observer.Messages[3].Value.Exception);
  1456. }
  1457. [TestMethod]
  1458. public void ReplaySubject_Reentrant()
  1459. {
  1460. var r = new ReplaySubject<int>(4);
  1461. r.OnNext(0);
  1462. r.OnNext(1);
  1463. r.OnNext(2);
  1464. r.OnNext(3);
  1465. r.OnNext(4);
  1466. var xs = new List<int>();
  1467. var i = 0;
  1468. r.Subscribe(x =>
  1469. {
  1470. xs.Add(x);
  1471. if (++i <= 10)
  1472. {
  1473. r.OnNext(x);
  1474. }
  1475. });
  1476. r.OnNext(5);
  1477. Assert.IsTrue(xs.SequenceEqual(new[]
  1478. {
  1479. 1, 2, 3, 4, // original
  1480. 1, 2, 3, 4, // reentrant (+ fed back)
  1481. 1, 2, 3, 4, // reentrant (+ first two fed back)
  1482. 1, 2, // reentrant
  1483. 5 // tune in
  1484. }));
  1485. }
  1486. [TestMethod]
  1487. public void FastImmediateObserver_Simple1()
  1488. {
  1489. var res = FastImmediateObserverTest(fio =>
  1490. {
  1491. fio.OnNext(1);
  1492. fio.OnNext(2);
  1493. fio.OnNext(3);
  1494. fio.OnCompleted();
  1495. fio.EnsureActive(4);
  1496. });
  1497. res.AssertEqual(
  1498. OnNext(0, 1),
  1499. OnNext(1, 2),
  1500. OnNext(2, 3),
  1501. OnCompleted<int>(3)
  1502. );
  1503. }
  1504. [TestMethod]
  1505. public void FastImmediateObserver_Simple2()
  1506. {
  1507. var ex = new Exception();
  1508. var res = FastImmediateObserverTest(fio =>
  1509. {
  1510. fio.OnNext(1);
  1511. fio.OnNext(2);
  1512. fio.OnNext(3);
  1513. fio.OnError(ex);
  1514. fio.EnsureActive(4);
  1515. });
  1516. res.AssertEqual(
  1517. OnNext(0, 1),
  1518. OnNext(1, 2),
  1519. OnNext(2, 3),
  1520. OnError<int>(3, ex)
  1521. );
  1522. }
  1523. [TestMethod]
  1524. public void FastImmediateObserver_Simple3()
  1525. {
  1526. var res = FastImmediateObserverTest(fio =>
  1527. {
  1528. fio.OnNext(1);
  1529. fio.EnsureActive();
  1530. fio.OnNext(2);
  1531. fio.EnsureActive();
  1532. fio.OnNext(3);
  1533. fio.EnsureActive();
  1534. fio.OnCompleted();
  1535. fio.EnsureActive();
  1536. });
  1537. res.AssertEqual(
  1538. OnNext(0, 1),
  1539. OnNext(1, 2),
  1540. OnNext(2, 3),
  1541. OnCompleted<int>(3)
  1542. );
  1543. }
  1544. [TestMethod]
  1545. public void FastImmediateObserver_Fault()
  1546. {
  1547. var xs = new List<int>();
  1548. var o = Observer.Create<int>(
  1549. x => { xs.Add(x); if (x == 2) throw new Exception(); },
  1550. ex => { },
  1551. () => { }
  1552. );
  1553. var fio = new FastImmediateObserver<int>(o);
  1554. fio.OnNext(1);
  1555. fio.OnNext(2);
  1556. fio.OnNext(3);
  1557. ReactiveAssert.Throws<Exception>(() => fio.EnsureActive());
  1558. fio.OnNext(4);
  1559. fio.EnsureActive();
  1560. fio.OnNext(2);
  1561. fio.EnsureActive();
  1562. Assert.IsTrue(xs.Count == 2);
  1563. }
  1564. #if !NO_TPL
  1565. [TestMethod]
  1566. public void FastImmediateObserver_Ownership1()
  1567. {
  1568. var xs = new List<int>();
  1569. var o = Observer.Create<int>(
  1570. xs.Add,
  1571. ex => { },
  1572. () => { }
  1573. );
  1574. var fio = new FastImmediateObserver<int>(o);
  1575. var ts = new Task[16];
  1576. var N = 100;
  1577. for (var i = 0; i < ts.Length; i++)
  1578. {
  1579. var j = i;
  1580. ts[i] = Task.Factory.StartNew(() =>
  1581. {
  1582. for (var k = 0; k < N; k++)
  1583. {
  1584. fio.OnNext(j * N + k);
  1585. }
  1586. fio.EnsureActive(N);
  1587. });
  1588. }
  1589. Task.WaitAll(ts);
  1590. Assert.IsTrue(xs.Count == ts.Length * N);
  1591. }
  1592. [TestMethod]
  1593. public void FastImmediateObserver_Ownership2()
  1594. {
  1595. var cd = new CountdownEvent(3);
  1596. var w = new ManualResetEvent(false);
  1597. var e = new ManualResetEvent(false);
  1598. var xs = new List<int>();
  1599. var o = Observer.Create<int>(
  1600. x => { xs.Add(x); w.Set(); e.WaitOne(); cd.Signal(); },
  1601. ex => { },
  1602. () => { }
  1603. );
  1604. var fio = new FastImmediateObserver<int>(o);
  1605. fio.OnNext(1);
  1606. var t = Task.Factory.StartNew(() =>
  1607. {
  1608. fio.EnsureActive();
  1609. });
  1610. w.WaitOne();
  1611. fio.OnNext(2);
  1612. fio.OnNext(3);
  1613. fio.EnsureActive(2);
  1614. e.Set();
  1615. cd.Wait();
  1616. Assert.IsTrue(xs.Count == 3);
  1617. }
  1618. #endif
  1619. private IEnumerable<Recorded<Notification<int>>> FastImmediateObserverTest(Action<IScheduledObserver<int>> f)
  1620. {
  1621. var ns = new List<Recorded<Notification<int>>>();
  1622. var l = 0L;
  1623. var o = Observer.Create<int>(
  1624. x => { ns.Add(OnNext<int>(l++, x)); },
  1625. ex => { ns.Add(OnError<int>(l++, ex)); },
  1626. () => { ns.Add(OnCompleted<int>(l++)); }
  1627. );
  1628. var fio = new FastImmediateObserver<int>(o);
  1629. f(fio);
  1630. return ns;
  1631. }
  1632. }
  1633. }