ReplaySubjectTest.cs 72 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Subjects;
  10. using System.Threading;
  11. using System.Threading.Tasks;
  12. using Microsoft.Reactive.Testing;
  13. using ReactiveTests.Dummies;
  14. using Microsoft.VisualStudio.TestTools.UnitTesting;
  15. using Assert = Xunit.Assert;
  16. namespace ReactiveTests.Tests
  17. {
  18. [TestClass]
  19. public partial class ReplaySubjectTest : ReactiveTest
  20. {
  21. [TestMethod]
  22. public void Subscribe_ArgumentChecking()
  23. {
  24. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().Subscribe(null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).Subscribe(null));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).Subscribe(null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).Subscribe(null));
  28. }
  29. [TestMethod]
  30. public void OnError_ArgumentChecking()
  31. {
  32. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().OnError(null));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).OnError(null));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).OnError(null));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).OnError(null));
  36. }
  37. [TestMethod]
  38. public void Constructor_ArgumentChecking()
  39. {
  40. #pragma warning disable CA1806 // (Unused new instance.) We are just testing whether or not the constructor throws.
  41. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1));
  42. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, DummyScheduler.Instance));
  43. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero));
  44. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero, DummyScheduler.Instance));
  45. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1)));
  46. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  47. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1)));
  48. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  49. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(null));
  50. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, null));
  51. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(TimeSpan.Zero, null));
  52. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, TimeSpan.Zero, null));
  53. // zero allowed
  54. new ReplaySubject<int>(0);
  55. new ReplaySubject<int>(TimeSpan.Zero);
  56. new ReplaySubject<int>(0, TimeSpan.Zero);
  57. new ReplaySubject<int>(0, DummyScheduler.Instance);
  58. new ReplaySubject<int>(TimeSpan.Zero, DummyScheduler.Instance);
  59. new ReplaySubject<int>(0, TimeSpan.Zero, DummyScheduler.Instance);
  60. #pragma warning restore CA1806
  61. }
  62. [TestMethod]
  63. public void Infinite_ReplayByTime()
  64. {
  65. var scheduler = new TestScheduler();
  66. var xs = scheduler.CreateHotObservable(
  67. OnNext(70, 1),
  68. OnNext(110, 2),
  69. OnNext(220, 3),
  70. OnNext(270, 4),
  71. OnNext(340, 5),
  72. OnNext(410, 6),
  73. OnNext(520, 7),
  74. OnNext(630, 8),
  75. OnNext(710, 9),
  76. OnNext(870, 10),
  77. OnNext(940, 11),
  78. OnNext(1020, 12)
  79. );
  80. var subject = default(ReplaySubject<int>);
  81. var subscription = default(IDisposable);
  82. var results1 = scheduler.CreateObserver<int>();
  83. var subscription1 = default(IDisposable);
  84. var results2 = scheduler.CreateObserver<int>();
  85. var subscription2 = default(IDisposable);
  86. var results3 = scheduler.CreateObserver<int>();
  87. var subscription3 = default(IDisposable);
  88. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  89. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  90. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  91. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  92. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  93. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  94. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  95. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  96. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  97. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  98. scheduler.Start();
  99. results1.Messages.AssertEqual(
  100. OnNext(301, 3),
  101. OnNext(302, 4),
  102. OnNext(341, 5),
  103. OnNext(411, 6),
  104. OnNext(521, 7)
  105. );
  106. results2.Messages.AssertEqual(
  107. OnNext(401, 5),
  108. OnNext(411, 6),
  109. OnNext(521, 7),
  110. OnNext(631, 8)
  111. );
  112. results3.Messages.AssertEqual(
  113. OnNext(901, 10),
  114. OnNext(941, 11)
  115. );
  116. }
  117. [TestMethod]
  118. public void Infinite_ReplayOne()
  119. {
  120. var scheduler = new TestScheduler();
  121. var xs = scheduler.CreateHotObservable(
  122. OnNext(70, 1),
  123. OnNext(110, 2),
  124. OnNext(220, 3),
  125. OnNext(270, 4),
  126. OnNext(340, 5),
  127. OnNext(410, 6),
  128. OnNext(520, 7),
  129. OnNext(630, 8),
  130. OnNext(710, 9),
  131. OnNext(870, 10),
  132. OnNext(940, 11),
  133. OnNext(1020, 12)
  134. );
  135. var subject = default(ReplaySubject<int>);
  136. var subscription = default(IDisposable);
  137. var results1 = scheduler.CreateObserver<int>();
  138. var subscription1 = default(IDisposable);
  139. var results2 = scheduler.CreateObserver<int>();
  140. var subscription2 = default(IDisposable);
  141. var results3 = scheduler.CreateObserver<int>();
  142. var subscription3 = default(IDisposable);
  143. var results4 = scheduler.CreateObserver<int>();
  144. var subscription4 = default(IDisposable);
  145. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  146. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  147. scheduler.ScheduleAbsolute(1200, () => subscription.Dispose());
  148. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  149. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  150. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  151. scheduler.ScheduleAbsolute(1100, () => subscription4 = subject.Subscribe(results4));
  152. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  153. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  154. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  155. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  156. scheduler.Start();
  157. results1.Messages.AssertEqual(
  158. OnNext(300, 4),
  159. OnNext(340, 5),
  160. OnNext(410, 6),
  161. OnNext(520, 7)
  162. );
  163. results2.Messages.AssertEqual(
  164. OnNext(400, 5),
  165. OnNext(410, 6),
  166. OnNext(520, 7),
  167. OnNext(630, 8)
  168. );
  169. results3.Messages.AssertEqual(
  170. OnNext(900, 10),
  171. OnNext(940, 11)
  172. );
  173. results4.Messages.AssertEqual(
  174. OnNext(1100, 12)
  175. );
  176. }
  177. [TestMethod]
  178. public void Infinite_ReplayMany()
  179. {
  180. var scheduler = new TestScheduler();
  181. var xs = scheduler.CreateHotObservable(
  182. OnNext(70, 1),
  183. OnNext(110, 2),
  184. OnNext(220, 3),
  185. OnNext(270, 4),
  186. OnNext(340, 5),
  187. OnNext(410, 6),
  188. OnNext(520, 7),
  189. OnNext(630, 8),
  190. OnNext(710, 9),
  191. OnNext(870, 10),
  192. OnNext(940, 11),
  193. OnNext(1020, 12)
  194. );
  195. var subject = default(ReplaySubject<int>);
  196. var subscription = default(IDisposable);
  197. var results1 = scheduler.CreateObserver<int>();
  198. var subscription1 = default(IDisposable);
  199. var results2 = scheduler.CreateObserver<int>();
  200. var subscription2 = default(IDisposable);
  201. var results3 = scheduler.CreateObserver<int>();
  202. var subscription3 = default(IDisposable);
  203. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  204. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  205. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  206. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  207. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  208. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  209. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  210. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  211. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  212. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  213. scheduler.Start();
  214. results1.Messages.AssertEqual(
  215. OnNext(300, 3),
  216. OnNext(300, 4),
  217. OnNext(340, 5),
  218. OnNext(410, 6),
  219. OnNext(520, 7)
  220. );
  221. results2.Messages.AssertEqual(
  222. OnNext(400, 3),
  223. OnNext(400, 4),
  224. OnNext(400, 5),
  225. OnNext(410, 6),
  226. OnNext(520, 7),
  227. OnNext(630, 8)
  228. );
  229. results3.Messages.AssertEqual(
  230. OnNext(900, 8),
  231. OnNext(900, 9),
  232. OnNext(900, 10),
  233. OnNext(940, 11)
  234. );
  235. }
  236. [TestMethod]
  237. public void Infinite_ReplayAll()
  238. {
  239. var scheduler = new TestScheduler();
  240. var xs = scheduler.CreateHotObservable(
  241. OnNext(70, 1),
  242. OnNext(110, 2),
  243. OnNext(220, 3),
  244. OnNext(270, 4),
  245. OnNext(340, 5),
  246. OnNext(410, 6),
  247. OnNext(520, 7),
  248. OnNext(630, 8),
  249. OnNext(710, 9),
  250. OnNext(870, 10),
  251. OnNext(940, 11),
  252. OnNext(1020, 12)
  253. );
  254. var subject = default(ReplaySubject<int>);
  255. var subscription = default(IDisposable);
  256. var results1 = scheduler.CreateObserver<int>();
  257. var subscription1 = default(IDisposable);
  258. var results2 = scheduler.CreateObserver<int>();
  259. var subscription2 = default(IDisposable);
  260. var results3 = scheduler.CreateObserver<int>();
  261. var subscription3 = default(IDisposable);
  262. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  263. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  264. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  265. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  266. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  267. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  268. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  269. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  270. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  271. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  272. scheduler.Start();
  273. results1.Messages.AssertEqual(
  274. OnNext(300, 3),
  275. OnNext(300, 4),
  276. OnNext(340, 5),
  277. OnNext(410, 6),
  278. OnNext(520, 7)
  279. );
  280. results2.Messages.AssertEqual(
  281. OnNext(400, 3),
  282. OnNext(400, 4),
  283. OnNext(400, 5),
  284. OnNext(410, 6),
  285. OnNext(520, 7),
  286. OnNext(630, 8)
  287. );
  288. results3.Messages.AssertEqual(
  289. OnNext(900, 3),
  290. OnNext(900, 4),
  291. OnNext(900, 5),
  292. OnNext(900, 6),
  293. OnNext(900, 7),
  294. OnNext(900, 8),
  295. OnNext(900, 9),
  296. OnNext(900, 10),
  297. OnNext(940, 11)
  298. );
  299. }
  300. [TestMethod]
  301. public void Infinite2()
  302. {
  303. var scheduler = new TestScheduler();
  304. var xs = scheduler.CreateHotObservable(
  305. OnNext(70, 1),
  306. OnNext(110, 2),
  307. OnNext(220, 3),
  308. OnNext(270, 4),
  309. OnNext(280, -1),
  310. OnNext(290, -2),
  311. OnNext(340, 5),
  312. OnNext(410, 6),
  313. OnNext(520, 7),
  314. OnNext(630, 8),
  315. OnNext(710, 9),
  316. OnNext(870, 10),
  317. OnNext(940, 11),
  318. OnNext(1020, 12)
  319. );
  320. var subject = default(ReplaySubject<int>);
  321. var subscription = default(IDisposable);
  322. var results1 = scheduler.CreateObserver<int>();
  323. var subscription1 = default(IDisposable);
  324. var results2 = scheduler.CreateObserver<int>();
  325. var subscription2 = default(IDisposable);
  326. var results3 = scheduler.CreateObserver<int>();
  327. var subscription3 = default(IDisposable);
  328. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  329. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  330. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  331. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  332. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  333. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  334. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  335. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  336. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  337. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  338. scheduler.Start();
  339. results1.Messages.AssertEqual(
  340. OnNext(301, 4),
  341. OnNext(302, -1),
  342. OnNext(303, -2),
  343. OnNext(341, 5),
  344. OnNext(411, 6),
  345. OnNext(521, 7)
  346. );
  347. results2.Messages.AssertEqual(
  348. OnNext(401, 5),
  349. OnNext(411, 6),
  350. OnNext(521, 7),
  351. OnNext(631, 8)
  352. );
  353. results3.Messages.AssertEqual(
  354. OnNext(901, 10),
  355. OnNext(941, 11)
  356. );
  357. }
  358. [TestMethod]
  359. public void Finite_ReplayByTime()
  360. {
  361. var scheduler = new TestScheduler();
  362. var xs = scheduler.CreateHotObservable(
  363. OnNext(70, 1),
  364. OnNext(110, 2),
  365. OnNext(220, 3),
  366. OnNext(270, 4),
  367. OnNext(340, 5),
  368. OnNext(410, 6),
  369. OnNext(520, 7),
  370. OnCompleted<int>(630),
  371. OnNext(640, 9),
  372. OnCompleted<int>(650),
  373. OnError<int>(660, new Exception())
  374. );
  375. var subject = default(ReplaySubject<int>);
  376. var subscription = default(IDisposable);
  377. var results1 = scheduler.CreateObserver<int>();
  378. var subscription1 = default(IDisposable);
  379. var results2 = scheduler.CreateObserver<int>();
  380. var subscription2 = default(IDisposable);
  381. var results3 = scheduler.CreateObserver<int>();
  382. var subscription3 = default(IDisposable);
  383. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  384. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  385. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  386. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  387. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  388. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  389. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  390. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  391. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  392. scheduler.Start();
  393. results1.Messages.AssertEqual(
  394. OnNext(301, 3),
  395. OnNext(302, 4),
  396. OnNext(341, 5),
  397. OnNext(411, 6),
  398. OnNext(521, 7)
  399. );
  400. results2.Messages.AssertEqual(
  401. OnNext(401, 5),
  402. OnNext(411, 6),
  403. OnNext(521, 7),
  404. OnCompleted<int>(631)
  405. );
  406. results3.Messages.AssertEqual(
  407. OnCompleted<int>(901)
  408. );
  409. }
  410. [TestMethod]
  411. public void Finite_ReplayOne()
  412. {
  413. var scheduler = new TestScheduler();
  414. var xs = scheduler.CreateHotObservable(
  415. OnNext(70, 1),
  416. OnNext(110, 2),
  417. OnNext(220, 3),
  418. OnNext(270, 4),
  419. OnNext(340, 5),
  420. OnNext(410, 6),
  421. OnNext(520, 7),
  422. OnCompleted<int>(630),
  423. OnNext(640, 9),
  424. OnCompleted<int>(650),
  425. OnError<int>(660, new Exception())
  426. );
  427. var subject = default(ReplaySubject<int>);
  428. var subscription = default(IDisposable);
  429. var results1 = scheduler.CreateObserver<int>();
  430. var subscription1 = default(IDisposable);
  431. var results2 = scheduler.CreateObserver<int>();
  432. var subscription2 = default(IDisposable);
  433. var results3 = scheduler.CreateObserver<int>();
  434. var subscription3 = default(IDisposable);
  435. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  436. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  437. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  438. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  439. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  440. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  441. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  442. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  443. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  444. scheduler.Start();
  445. results1.Messages.AssertEqual(
  446. OnNext(300, 4),
  447. OnNext(340, 5),
  448. OnNext(410, 6),
  449. OnNext(520, 7)
  450. );
  451. results2.Messages.AssertEqual(
  452. OnNext(400, 5),
  453. OnNext(410, 6),
  454. OnNext(520, 7),
  455. OnCompleted<int>(630)
  456. );
  457. results3.Messages.AssertEqual(
  458. OnNext(900, 7),
  459. OnCompleted<int>(900)
  460. );
  461. }
  462. [TestMethod]
  463. public void Finite_ReplayMany()
  464. {
  465. var scheduler = new TestScheduler();
  466. var xs = scheduler.CreateHotObservable(
  467. OnNext(70, 1),
  468. OnNext(110, 2),
  469. OnNext(220, 3),
  470. OnNext(270, 4),
  471. OnNext(340, 5),
  472. OnNext(410, 6),
  473. OnNext(520, 7),
  474. OnCompleted<int>(630),
  475. OnNext(640, 9),
  476. OnCompleted<int>(650),
  477. OnError<int>(660, new Exception())
  478. );
  479. var subject = default(ReplaySubject<int>);
  480. var subscription = default(IDisposable);
  481. var results1 = scheduler.CreateObserver<int>();
  482. var subscription1 = default(IDisposable);
  483. var results2 = scheduler.CreateObserver<int>();
  484. var subscription2 = default(IDisposable);
  485. var results3 = scheduler.CreateObserver<int>();
  486. var subscription3 = default(IDisposable);
  487. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  488. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  489. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  490. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  491. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  492. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  493. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  494. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  495. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  496. scheduler.Start();
  497. results1.Messages.AssertEqual(
  498. OnNext(300, 3),
  499. OnNext(300, 4),
  500. OnNext(340, 5),
  501. OnNext(410, 6),
  502. OnNext(520, 7)
  503. );
  504. results2.Messages.AssertEqual(
  505. OnNext(400, 3),
  506. OnNext(400, 4),
  507. OnNext(400, 5),
  508. OnNext(410, 6),
  509. OnNext(520, 7),
  510. OnCompleted<int>(630)
  511. );
  512. results3.Messages.AssertEqual(
  513. OnNext(900, 5),
  514. OnNext(900, 6),
  515. OnNext(900, 7),
  516. OnCompleted<int>(900)
  517. );
  518. }
  519. [TestMethod]
  520. public void Finite_ReplayAll()
  521. {
  522. var scheduler = new TestScheduler();
  523. var xs = scheduler.CreateHotObservable(
  524. OnNext(70, 1),
  525. OnNext(110, 2),
  526. OnNext(220, 3),
  527. OnNext(270, 4),
  528. OnNext(340, 5),
  529. OnNext(410, 6),
  530. OnNext(520, 7),
  531. OnCompleted<int>(630),
  532. OnNext(640, 9),
  533. OnCompleted<int>(650),
  534. OnError<int>(660, new Exception())
  535. );
  536. var subject = default(ReplaySubject<int>);
  537. var subscription = default(IDisposable);
  538. var results1 = scheduler.CreateObserver<int>();
  539. var subscription1 = default(IDisposable);
  540. var results2 = scheduler.CreateObserver<int>();
  541. var subscription2 = default(IDisposable);
  542. var results3 = scheduler.CreateObserver<int>();
  543. var subscription3 = default(IDisposable);
  544. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  545. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  546. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  547. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  548. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  549. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  550. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  551. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  552. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  553. scheduler.Start();
  554. results1.Messages.AssertEqual(
  555. OnNext(300, 3),
  556. OnNext(300, 4),
  557. OnNext(340, 5),
  558. OnNext(410, 6),
  559. OnNext(520, 7)
  560. );
  561. results2.Messages.AssertEqual(
  562. OnNext(400, 3),
  563. OnNext(400, 4),
  564. OnNext(400, 5),
  565. OnNext(410, 6),
  566. OnNext(520, 7),
  567. OnCompleted<int>(630)
  568. );
  569. results3.Messages.AssertEqual(
  570. OnNext(900, 3),
  571. OnNext(900, 4),
  572. OnNext(900, 5),
  573. OnNext(900, 6),
  574. OnNext(900, 7),
  575. OnCompleted<int>(900)
  576. );
  577. }
  578. [TestMethod]
  579. public void Error_ReplayByTime()
  580. {
  581. var scheduler = new TestScheduler();
  582. var ex = new Exception();
  583. var xs = scheduler.CreateHotObservable(
  584. OnNext(70, 1),
  585. OnNext(110, 2),
  586. OnNext(220, 3),
  587. OnNext(270, 4),
  588. OnNext(340, 5),
  589. OnNext(410, 6),
  590. OnNext(520, 7),
  591. OnError<int>(630, ex),
  592. OnNext(640, 9),
  593. OnCompleted<int>(650),
  594. OnError<int>(660, new Exception())
  595. );
  596. var subject = default(ReplaySubject<int>);
  597. var subscription = default(IDisposable);
  598. var results1 = scheduler.CreateObserver<int>();
  599. var subscription1 = default(IDisposable);
  600. var results2 = scheduler.CreateObserver<int>();
  601. var subscription2 = default(IDisposable);
  602. var results3 = scheduler.CreateObserver<int>();
  603. var subscription3 = default(IDisposable);
  604. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  605. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  606. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  607. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  608. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  609. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  610. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  611. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  612. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  613. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  614. scheduler.Start();
  615. results1.Messages.AssertEqual(
  616. OnNext(301, 3),
  617. OnNext(302, 4),
  618. OnNext(341, 5),
  619. OnNext(411, 6),
  620. OnNext(521, 7)
  621. );
  622. results2.Messages.AssertEqual(
  623. OnNext(401, 5),
  624. OnNext(411, 6),
  625. OnNext(521, 7),
  626. OnError<int>(631, ex)
  627. );
  628. results3.Messages.AssertEqual(
  629. OnError<int>(901, ex)
  630. );
  631. }
  632. [TestMethod]
  633. public void Error_ReplayOne()
  634. {
  635. var scheduler = new TestScheduler();
  636. var ex = new Exception();
  637. var xs = scheduler.CreateHotObservable(
  638. OnNext(70, 1),
  639. OnNext(110, 2),
  640. OnNext(220, 3),
  641. OnNext(270, 4),
  642. OnNext(340, 5),
  643. OnNext(410, 6),
  644. OnNext(520, 7),
  645. OnError<int>(630, ex),
  646. OnNext(640, 9),
  647. OnCompleted<int>(650),
  648. OnError<int>(660, new Exception())
  649. );
  650. var subject = default(ReplaySubject<int>);
  651. var subscription = default(IDisposable);
  652. var results1 = scheduler.CreateObserver<int>();
  653. var subscription1 = default(IDisposable);
  654. var results2 = scheduler.CreateObserver<int>();
  655. var subscription2 = default(IDisposable);
  656. var results3 = scheduler.CreateObserver<int>();
  657. var subscription3 = default(IDisposable);
  658. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  659. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  660. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  661. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  662. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  663. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  664. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  665. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  666. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  667. scheduler.Start();
  668. results1.Messages.AssertEqual(
  669. OnNext(300, 4),
  670. OnNext(340, 5),
  671. OnNext(410, 6),
  672. OnNext(520, 7)
  673. );
  674. results2.Messages.AssertEqual(
  675. OnNext(400, 5),
  676. OnNext(410, 6),
  677. OnNext(520, 7),
  678. OnError<int>(630, ex)
  679. );
  680. results3.Messages.AssertEqual(
  681. OnNext(900, 7),
  682. OnError<int>(900, ex)
  683. );
  684. }
  685. [TestMethod]
  686. public void Error_ReplayMany()
  687. {
  688. var scheduler = new TestScheduler();
  689. var ex = new Exception();
  690. var xs = scheduler.CreateHotObservable(
  691. OnNext(70, 1),
  692. OnNext(110, 2),
  693. OnNext(220, 3),
  694. OnNext(270, 4),
  695. OnNext(340, 5),
  696. OnNext(410, 6),
  697. OnNext(520, 7),
  698. OnError<int>(630, ex),
  699. OnNext(640, 9),
  700. OnCompleted<int>(650),
  701. OnError<int>(660, new Exception())
  702. );
  703. var subject = default(ReplaySubject<int>);
  704. var subscription = default(IDisposable);
  705. var results1 = scheduler.CreateObserver<int>();
  706. var subscription1 = default(IDisposable);
  707. var results2 = scheduler.CreateObserver<int>();
  708. var subscription2 = default(IDisposable);
  709. var results3 = scheduler.CreateObserver<int>();
  710. var subscription3 = default(IDisposable);
  711. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  712. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  713. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  714. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  715. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  716. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  717. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  718. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  719. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  720. scheduler.Start();
  721. results1.Messages.AssertEqual(
  722. OnNext(300, 3),
  723. OnNext(300, 4),
  724. OnNext(340, 5),
  725. OnNext(410, 6),
  726. OnNext(520, 7)
  727. );
  728. results2.Messages.AssertEqual(
  729. OnNext(400, 3),
  730. OnNext(400, 4),
  731. OnNext(400, 5),
  732. OnNext(410, 6),
  733. OnNext(520, 7),
  734. OnError<int>(630, ex)
  735. );
  736. results3.Messages.AssertEqual(
  737. OnNext(900, 5),
  738. OnNext(900, 6),
  739. OnNext(900, 7),
  740. OnError<int>(900, ex)
  741. );
  742. }
  743. [TestMethod]
  744. public void Error_ReplayAll()
  745. {
  746. var scheduler = new TestScheduler();
  747. var ex = new Exception();
  748. var xs = scheduler.CreateHotObservable(
  749. OnNext(70, 1),
  750. OnNext(110, 2),
  751. OnNext(220, 3),
  752. OnNext(270, 4),
  753. OnNext(340, 5),
  754. OnNext(410, 6),
  755. OnNext(520, 7),
  756. OnError<int>(630, ex),
  757. OnNext(640, 9),
  758. OnCompleted<int>(650),
  759. OnError<int>(660, new Exception())
  760. );
  761. var subject = default(ReplaySubject<int>);
  762. var subscription = default(IDisposable);
  763. var results1 = scheduler.CreateObserver<int>();
  764. var subscription1 = default(IDisposable);
  765. var results2 = scheduler.CreateObserver<int>();
  766. var subscription2 = default(IDisposable);
  767. var results3 = scheduler.CreateObserver<int>();
  768. var subscription3 = default(IDisposable);
  769. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  770. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  771. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  772. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  773. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  774. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  775. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  776. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  777. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  778. scheduler.Start();
  779. results1.Messages.AssertEqual(
  780. OnNext(300, 3),
  781. OnNext(300, 4),
  782. OnNext(340, 5),
  783. OnNext(410, 6),
  784. OnNext(520, 7)
  785. );
  786. results2.Messages.AssertEqual(
  787. OnNext(400, 3),
  788. OnNext(400, 4),
  789. OnNext(400, 5),
  790. OnNext(410, 6),
  791. OnNext(520, 7),
  792. OnError<int>(630, ex)
  793. );
  794. results3.Messages.AssertEqual(
  795. OnNext(900, 3),
  796. OnNext(900, 4),
  797. OnNext(900, 5),
  798. OnNext(900, 6),
  799. OnNext(900, 7),
  800. OnError<int>(900, ex)
  801. );
  802. }
  803. [TestMethod]
  804. public void Canceled_ReplayByTime()
  805. {
  806. var scheduler = new TestScheduler();
  807. var xs = scheduler.CreateHotObservable(
  808. OnCompleted<int>(630),
  809. OnNext(640, 9),
  810. OnCompleted<int>(650),
  811. OnError<int>(660, new Exception())
  812. );
  813. var subject = default(ReplaySubject<int>);
  814. var subscription = default(IDisposable);
  815. var results1 = scheduler.CreateObserver<int>();
  816. var subscription1 = default(IDisposable);
  817. var results2 = scheduler.CreateObserver<int>();
  818. var subscription2 = default(IDisposable);
  819. var results3 = scheduler.CreateObserver<int>();
  820. var subscription3 = default(IDisposable);
  821. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  822. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  823. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  824. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  825. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  826. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  827. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  828. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  829. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  830. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  831. scheduler.Start();
  832. results1.Messages.AssertEqual(
  833. );
  834. results2.Messages.AssertEqual(
  835. OnCompleted<int>(631)
  836. );
  837. results3.Messages.AssertEqual(
  838. OnCompleted<int>(901)
  839. );
  840. }
  841. [TestMethod]
  842. public void Canceled_ReplayOne()
  843. {
  844. var scheduler = new TestScheduler();
  845. var xs = scheduler.CreateHotObservable(
  846. OnCompleted<int>(630),
  847. OnNext(640, 9),
  848. OnCompleted<int>(650),
  849. OnError<int>(660, new Exception())
  850. );
  851. var subject = default(ReplaySubject<int>);
  852. var subscription = default(IDisposable);
  853. var results1 = scheduler.CreateObserver<int>();
  854. var subscription1 = default(IDisposable);
  855. var results2 = scheduler.CreateObserver<int>();
  856. var subscription2 = default(IDisposable);
  857. var results3 = scheduler.CreateObserver<int>();
  858. var subscription3 = default(IDisposable);
  859. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  860. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  861. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  862. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  863. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  864. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  865. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  866. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  867. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  868. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  869. scheduler.Start();
  870. results1.Messages.AssertEqual(
  871. );
  872. results2.Messages.AssertEqual(
  873. OnCompleted<int>(630)
  874. );
  875. results3.Messages.AssertEqual(
  876. OnCompleted<int>(900)
  877. );
  878. }
  879. [TestMethod]
  880. public void Canceled_ReplayMany()
  881. {
  882. var scheduler = new TestScheduler();
  883. var xs = scheduler.CreateHotObservable(
  884. OnCompleted<int>(630),
  885. OnNext(640, 9),
  886. OnCompleted<int>(650),
  887. OnError<int>(660, new Exception())
  888. );
  889. var subject = default(ReplaySubject<int>);
  890. var subscription = default(IDisposable);
  891. var results1 = scheduler.CreateObserver<int>();
  892. var subscription1 = default(IDisposable);
  893. var results2 = scheduler.CreateObserver<int>();
  894. var subscription2 = default(IDisposable);
  895. var results3 = scheduler.CreateObserver<int>();
  896. var subscription3 = default(IDisposable);
  897. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  898. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  899. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  900. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  901. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  902. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  903. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  904. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  905. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  906. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  907. scheduler.Start();
  908. results1.Messages.AssertEqual(
  909. );
  910. results2.Messages.AssertEqual(
  911. OnCompleted<int>(630)
  912. );
  913. results3.Messages.AssertEqual(
  914. OnCompleted<int>(900)
  915. );
  916. }
  917. [TestMethod]
  918. public void Canceled_ReplayAll()
  919. {
  920. var scheduler = new TestScheduler();
  921. var xs = scheduler.CreateHotObservable(
  922. OnCompleted<int>(630),
  923. OnNext(640, 9),
  924. OnCompleted<int>(650),
  925. OnError<int>(660, new Exception())
  926. );
  927. var subject = default(ReplaySubject<int>);
  928. var subscription = default(IDisposable);
  929. var results1 = scheduler.CreateObserver<int>();
  930. var subscription1 = default(IDisposable);
  931. var results2 = scheduler.CreateObserver<int>();
  932. var subscription2 = default(IDisposable);
  933. var results3 = scheduler.CreateObserver<int>();
  934. var subscription3 = default(IDisposable);
  935. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  936. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  937. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  938. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  939. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  940. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  941. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  942. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  943. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  944. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  945. scheduler.Start();
  946. results1.Messages.AssertEqual(
  947. );
  948. results2.Messages.AssertEqual(
  949. OnCompleted<int>(630)
  950. );
  951. results3.Messages.AssertEqual(
  952. OnCompleted<int>(900)
  953. );
  954. }
  955. [TestMethod]
  956. public void SubjectDisposed()
  957. {
  958. var scheduler = new TestScheduler();
  959. var subject = default(ReplaySubject<int>);
  960. var results1 = scheduler.CreateObserver<int>();
  961. var subscription1 = default(IDisposable);
  962. var results2 = scheduler.CreateObserver<int>();
  963. var subscription2 = default(IDisposable);
  964. var results3 = scheduler.CreateObserver<int>();
  965. var subscription3 = default(IDisposable);
  966. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(scheduler));
  967. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  968. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  969. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  970. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  971. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  972. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  973. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  974. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  975. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  976. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  977. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  978. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  979. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  980. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  981. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  982. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  983. scheduler.Start();
  984. results1.Messages.AssertEqual(
  985. OnNext(201, 1),
  986. OnNext(251, 2),
  987. OnNext(351, 3),
  988. OnNext(451, 4)
  989. );
  990. results2.Messages.AssertEqual(
  991. OnNext(301, 1),
  992. OnNext(302, 2),
  993. OnNext(351, 3),
  994. OnNext(451, 4),
  995. OnNext(551, 5)
  996. );
  997. results3.Messages.AssertEqual(
  998. OnNext(401, 1),
  999. OnNext(402, 2),
  1000. OnNext(403, 3),
  1001. OnNext(451, 4),
  1002. OnNext(551, 5)
  1003. );
  1004. }
  1005. [TestMethod]
  1006. public void SubjectDisposed_ReplayOne()
  1007. {
  1008. var scheduler = new TestScheduler();
  1009. var subject = default(ReplaySubject<int>);
  1010. var results1 = scheduler.CreateObserver<int>();
  1011. var subscription1 = default(IDisposable);
  1012. var results2 = scheduler.CreateObserver<int>();
  1013. var subscription2 = default(IDisposable);
  1014. var results3 = scheduler.CreateObserver<int>();
  1015. var subscription3 = default(IDisposable);
  1016. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  1017. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1018. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1019. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1020. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1021. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1022. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1023. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1024. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1025. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1026. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1027. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1028. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1029. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1030. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1031. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1032. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1033. scheduler.Start();
  1034. results1.Messages.AssertEqual(
  1035. OnNext(200, 1),
  1036. OnNext(250, 2),
  1037. OnNext(350, 3),
  1038. OnNext(450, 4)
  1039. );
  1040. results2.Messages.AssertEqual(
  1041. OnNext(300, 2),
  1042. OnNext(350, 3),
  1043. OnNext(450, 4),
  1044. OnNext(550, 5)
  1045. );
  1046. results3.Messages.AssertEqual(
  1047. OnNext(400, 3),
  1048. OnNext(450, 4),
  1049. OnNext(550, 5)
  1050. );
  1051. }
  1052. [TestMethod]
  1053. public void SubjectDisposed_ReplayMany()
  1054. {
  1055. var scheduler = new TestScheduler();
  1056. var subject = default(ReplaySubject<int>);
  1057. var results1 = scheduler.CreateObserver<int>();
  1058. var subscription1 = default(IDisposable);
  1059. var results2 = scheduler.CreateObserver<int>();
  1060. var subscription2 = default(IDisposable);
  1061. var results3 = scheduler.CreateObserver<int>();
  1062. var subscription3 = default(IDisposable);
  1063. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  1064. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1065. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1066. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1067. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1068. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1069. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1070. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1071. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1072. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1073. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1074. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1075. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1076. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1077. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1078. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1079. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1080. scheduler.Start();
  1081. results1.Messages.AssertEqual(
  1082. OnNext(200, 1),
  1083. OnNext(250, 2),
  1084. OnNext(350, 3),
  1085. OnNext(450, 4)
  1086. );
  1087. results2.Messages.AssertEqual(
  1088. OnNext(300, 1),
  1089. OnNext(300, 2),
  1090. OnNext(350, 3),
  1091. OnNext(450, 4),
  1092. OnNext(550, 5)
  1093. );
  1094. results3.Messages.AssertEqual(
  1095. OnNext(400, 1),
  1096. OnNext(400, 2),
  1097. OnNext(400, 3),
  1098. OnNext(450, 4),
  1099. OnNext(550, 5)
  1100. );
  1101. }
  1102. [TestMethod]
  1103. public void SubjectDisposed_ReplayAll()
  1104. {
  1105. var scheduler = new TestScheduler();
  1106. var subject = default(ReplaySubject<int>);
  1107. var results1 = scheduler.CreateObserver<int>();
  1108. var subscription1 = default(IDisposable);
  1109. var results2 = scheduler.CreateObserver<int>();
  1110. var subscription2 = default(IDisposable);
  1111. var results3 = scheduler.CreateObserver<int>();
  1112. var subscription3 = default(IDisposable);
  1113. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  1114. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1115. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1116. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1117. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1118. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1119. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1120. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1121. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1122. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1123. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1124. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1125. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1126. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1127. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1128. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1129. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1130. scheduler.Start();
  1131. results1.Messages.AssertEqual(
  1132. OnNext(200, 1),
  1133. OnNext(250, 2),
  1134. OnNext(350, 3),
  1135. OnNext(450, 4)
  1136. );
  1137. results2.Messages.AssertEqual(
  1138. OnNext(300, 1),
  1139. OnNext(300, 2),
  1140. OnNext(350, 3),
  1141. OnNext(450, 4),
  1142. OnNext(550, 5)
  1143. );
  1144. results3.Messages.AssertEqual(
  1145. OnNext(400, 1),
  1146. OnNext(400, 2),
  1147. OnNext(400, 3),
  1148. OnNext(450, 4),
  1149. OnNext(550, 5)
  1150. );
  1151. }
  1152. //
  1153. // TODO: Create a failing test for this for the other implementations (ReplayOne/Many/All).
  1154. // I don't understand the behavior.
  1155. // I think it may have to do with calling Trim() on Subscription (as well as in the OnNext calls). -LC
  1156. //
  1157. [TestMethod]
  1158. public void ReplaySubjectDiesOut()
  1159. {
  1160. //
  1161. // Tests v1.x behavior as documented in ReplaySubject.cs (Subscribe method).
  1162. //
  1163. var scheduler = new TestScheduler();
  1164. var xs = scheduler.CreateHotObservable(
  1165. OnNext(70, 1),
  1166. OnNext(110, 2),
  1167. OnNext(220, 3),
  1168. OnNext(270, 4),
  1169. OnNext(340, 5),
  1170. OnNext(410, 6),
  1171. OnNext(520, 7),
  1172. OnCompleted<int>(580)
  1173. );
  1174. var subject = default(ReplaySubject<int>);
  1175. var results1 = scheduler.CreateObserver<int>();
  1176. var results2 = scheduler.CreateObserver<int>();
  1177. var results3 = scheduler.CreateObserver<int>();
  1178. var results4 = scheduler.CreateObserver<int>();
  1179. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(int.MaxValue, TimeSpan.FromTicks(100), scheduler));
  1180. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(subject));
  1181. scheduler.ScheduleAbsolute(300, () => subject.Subscribe(results1));
  1182. scheduler.ScheduleAbsolute(400, () => subject.Subscribe(results2));
  1183. scheduler.ScheduleAbsolute(600, () => subject.Subscribe(results3));
  1184. scheduler.ScheduleAbsolute(900, () => subject.Subscribe(results4));
  1185. scheduler.Start();
  1186. results1.Messages.AssertEqual(
  1187. OnNext(301, 3),
  1188. OnNext(302, 4),
  1189. OnNext(341, 5),
  1190. OnNext(411, 6),
  1191. OnNext(521, 7),
  1192. OnCompleted<int>(581)
  1193. );
  1194. results2.Messages.AssertEqual(
  1195. OnNext(401, 5),
  1196. OnNext(411, 6),
  1197. OnNext(521, 7),
  1198. OnCompleted<int>(581)
  1199. );
  1200. results3.Messages.AssertEqual(
  1201. OnNext(601, 7),
  1202. OnCompleted<int>(602)
  1203. );
  1204. results4.Messages.AssertEqual(
  1205. OnCompleted<int>(901)
  1206. );
  1207. }
  1208. [TestMethod]
  1209. public void HasObservers()
  1210. {
  1211. HasObserversImpl(new ReplaySubject<int>());
  1212. HasObserversImpl(new ReplaySubject<int>(1));
  1213. HasObserversImpl(new ReplaySubject<int>(3));
  1214. HasObserversImpl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1215. }
  1216. private static void HasObserversImpl(ReplaySubject<int> s)
  1217. {
  1218. Assert.False(s.HasObservers);
  1219. var d1 = s.Subscribe(_ => { });
  1220. Assert.True(s.HasObservers);
  1221. d1.Dispose();
  1222. Assert.False(s.HasObservers);
  1223. var d2 = s.Subscribe(_ => { });
  1224. Assert.True(s.HasObservers);
  1225. var d3 = s.Subscribe(_ => { });
  1226. Assert.True(s.HasObservers);
  1227. d2.Dispose();
  1228. Assert.True(s.HasObservers);
  1229. d3.Dispose();
  1230. Assert.False(s.HasObservers);
  1231. }
  1232. [TestMethod]
  1233. public void HasObservers_Dispose1()
  1234. {
  1235. HasObservers_Dispose1Impl(new ReplaySubject<int>());
  1236. HasObservers_Dispose1Impl(new ReplaySubject<int>(1));
  1237. HasObservers_Dispose1Impl(new ReplaySubject<int>(3));
  1238. HasObservers_Dispose1Impl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1239. }
  1240. private static void HasObservers_Dispose1Impl(ReplaySubject<int> s)
  1241. {
  1242. Assert.False(s.HasObservers);
  1243. Assert.False(s.IsDisposed);
  1244. var d = s.Subscribe(_ => { });
  1245. Assert.True(s.HasObservers);
  1246. Assert.False(s.IsDisposed);
  1247. s.Dispose();
  1248. Assert.False(s.HasObservers);
  1249. Assert.True(s.IsDisposed);
  1250. d.Dispose();
  1251. Assert.False(s.HasObservers);
  1252. Assert.True(s.IsDisposed);
  1253. }
  1254. [TestMethod]
  1255. public void HasObservers_Dispose2()
  1256. {
  1257. HasObservers_Dispose2Impl(new ReplaySubject<int>());
  1258. HasObservers_Dispose2Impl(new ReplaySubject<int>(1));
  1259. HasObservers_Dispose2Impl(new ReplaySubject<int>(3));
  1260. HasObservers_Dispose2Impl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1261. }
  1262. private static void HasObservers_Dispose2Impl(ReplaySubject<int> s)
  1263. {
  1264. Assert.False(s.HasObservers);
  1265. Assert.False(s.IsDisposed);
  1266. var d = s.Subscribe(_ => { });
  1267. Assert.True(s.HasObservers);
  1268. Assert.False(s.IsDisposed);
  1269. d.Dispose();
  1270. Assert.False(s.HasObservers);
  1271. Assert.False(s.IsDisposed);
  1272. s.Dispose();
  1273. Assert.False(s.HasObservers);
  1274. Assert.True(s.IsDisposed);
  1275. }
  1276. [TestMethod]
  1277. public void HasObservers_Dispose3()
  1278. {
  1279. HasObservers_Dispose3Impl(new ReplaySubject<int>());
  1280. HasObservers_Dispose3Impl(new ReplaySubject<int>(1));
  1281. HasObservers_Dispose3Impl(new ReplaySubject<int>(3));
  1282. HasObservers_Dispose3Impl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1283. }
  1284. private static void HasObservers_Dispose3Impl(ReplaySubject<int> s)
  1285. {
  1286. Assert.False(s.HasObservers);
  1287. Assert.False(s.IsDisposed);
  1288. s.Dispose();
  1289. Assert.False(s.HasObservers);
  1290. Assert.True(s.IsDisposed);
  1291. }
  1292. [TestMethod]
  1293. public void HasObservers_OnCompleted()
  1294. {
  1295. HasObservers_OnCompletedImpl(new ReplaySubject<int>());
  1296. HasObservers_OnCompletedImpl(new ReplaySubject<int>(1));
  1297. HasObservers_OnCompletedImpl(new ReplaySubject<int>(3));
  1298. HasObservers_OnCompletedImpl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1299. }
  1300. private static void HasObservers_OnCompletedImpl(ReplaySubject<int> s)
  1301. {
  1302. Assert.False(s.HasObservers);
  1303. var d = s.Subscribe(_ => { });
  1304. Assert.True(s.HasObservers);
  1305. s.OnNext(42);
  1306. Assert.True(s.HasObservers);
  1307. s.OnCompleted();
  1308. Assert.False(s.HasObservers);
  1309. }
  1310. [TestMethod]
  1311. public void HasObservers_OnError()
  1312. {
  1313. HasObservers_OnErrorImpl(new ReplaySubject<int>());
  1314. HasObservers_OnErrorImpl(new ReplaySubject<int>(1));
  1315. HasObservers_OnErrorImpl(new ReplaySubject<int>(3));
  1316. HasObservers_OnErrorImpl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1317. }
  1318. private static void HasObservers_OnErrorImpl(ReplaySubject<int> s)
  1319. {
  1320. Assert.False(s.HasObservers);
  1321. var d = s.Subscribe(_ => { }, ex => { });
  1322. Assert.True(s.HasObservers);
  1323. s.OnNext(42);
  1324. Assert.True(s.HasObservers);
  1325. s.OnError(new Exception());
  1326. Assert.False(s.HasObservers);
  1327. }
  1328. [TestMethod]
  1329. public void Completed_to_late_subscriber_ReplayAll()
  1330. {
  1331. var s = new ReplaySubject<int>();
  1332. s.OnNext(1);
  1333. s.OnNext(2);
  1334. s.OnCompleted();
  1335. var scheduler = new TestScheduler();
  1336. var observer = scheduler.CreateObserver<int>();
  1337. s.Subscribe(observer);
  1338. Assert.Equal(3, observer.Messages.Count);
  1339. Assert.Equal(1, observer.Messages[0].Value.Value);
  1340. Assert.Equal(2, observer.Messages[1].Value.Value);
  1341. Assert.Equal(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1342. }
  1343. [TestMethod]
  1344. public void Completed_to_late_subscriber_ReplayOne()
  1345. {
  1346. var s = new ReplaySubject<int>(1);
  1347. s.OnNext(1);
  1348. s.OnNext(2);
  1349. s.OnCompleted();
  1350. var scheduler = new TestScheduler();
  1351. var observer = scheduler.CreateObserver<int>();
  1352. s.Subscribe(observer);
  1353. Assert.Equal(2, observer.Messages.Count);
  1354. Assert.Equal(2, observer.Messages[0].Value.Value);
  1355. Assert.Equal(NotificationKind.OnCompleted, observer.Messages[1].Value.Kind);
  1356. }
  1357. [TestMethod]
  1358. public void Completed_to_late_subscriber_ReplayMany()
  1359. {
  1360. var s = new ReplaySubject<int>(2);
  1361. s.OnNext(1);
  1362. s.OnNext(2);
  1363. s.OnNext(3);
  1364. s.OnCompleted();
  1365. var scheduler = new TestScheduler();
  1366. var observer = scheduler.CreateObserver<int>();
  1367. s.Subscribe(observer);
  1368. Assert.Equal(3, observer.Messages.Count);
  1369. Assert.Equal(2, observer.Messages[0].Value.Value);
  1370. Assert.Equal(3, observer.Messages[1].Value.Value);
  1371. Assert.Equal(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1372. }
  1373. [TestMethod]
  1374. public void Completed_to_late_subscriber_ReplayByTime()
  1375. {
  1376. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1377. s.OnNext(1);
  1378. s.OnNext(2);
  1379. s.OnNext(3);
  1380. s.OnCompleted();
  1381. var scheduler = new TestScheduler();
  1382. var observer = scheduler.CreateObserver<int>();
  1383. s.Subscribe(observer);
  1384. Assert.Equal(4, observer.Messages.Count);
  1385. Assert.Equal(1, observer.Messages[0].Value.Value);
  1386. Assert.Equal(2, observer.Messages[1].Value.Value);
  1387. Assert.Equal(3, observer.Messages[2].Value.Value);
  1388. Assert.Equal(NotificationKind.OnCompleted, observer.Messages[3].Value.Kind);
  1389. }
  1390. [TestMethod]
  1391. public void Errored_to_late_subscriber_ReplayAll()
  1392. {
  1393. var expectedException = new Exception("Test");
  1394. var s = new ReplaySubject<int>();
  1395. s.OnNext(1);
  1396. s.OnNext(2);
  1397. s.OnError(expectedException);
  1398. var scheduler = new TestScheduler();
  1399. var observer = scheduler.CreateObserver<int>();
  1400. s.Subscribe(observer);
  1401. Assert.Equal(3, observer.Messages.Count);
  1402. Assert.Equal(1, observer.Messages[0].Value.Value);
  1403. Assert.Equal(2, observer.Messages[1].Value.Value);
  1404. Assert.Equal(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1405. Assert.Equal(expectedException, observer.Messages[2].Value.Exception);
  1406. }
  1407. [TestMethod]
  1408. public void Errored_to_late_subscriber_ReplayOne()
  1409. {
  1410. var expectedException = new Exception("Test");
  1411. var s = new ReplaySubject<int>(1);
  1412. s.OnNext(1);
  1413. s.OnNext(2);
  1414. s.OnError(expectedException);
  1415. var scheduler = new TestScheduler();
  1416. var observer = scheduler.CreateObserver<int>();
  1417. s.Subscribe(observer);
  1418. Assert.Equal(2, observer.Messages.Count);
  1419. Assert.Equal(2, observer.Messages[0].Value.Value);
  1420. Assert.Equal(NotificationKind.OnError, observer.Messages[1].Value.Kind);
  1421. Assert.Equal(expectedException, observer.Messages[1].Value.Exception);
  1422. }
  1423. [TestMethod]
  1424. public void Errored_to_late_subscriber_ReplayMany()
  1425. {
  1426. var expectedException = new Exception("Test");
  1427. var s = new ReplaySubject<int>(2);
  1428. s.OnNext(1);
  1429. s.OnNext(2);
  1430. s.OnNext(3);
  1431. s.OnError(expectedException);
  1432. var scheduler = new TestScheduler();
  1433. var observer = scheduler.CreateObserver<int>();
  1434. s.Subscribe(observer);
  1435. Assert.Equal(3, observer.Messages.Count);
  1436. Assert.Equal(2, observer.Messages[0].Value.Value);
  1437. Assert.Equal(3, observer.Messages[1].Value.Value);
  1438. Assert.Equal(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1439. Assert.Equal(expectedException, observer.Messages[2].Value.Exception);
  1440. }
  1441. [TestMethod]
  1442. public void Errored_to_late_subscriber_ReplayByTime()
  1443. {
  1444. var expectedException = new Exception("Test");
  1445. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1446. s.OnNext(1);
  1447. s.OnNext(2);
  1448. s.OnNext(3);
  1449. s.OnError(expectedException);
  1450. var scheduler = new TestScheduler();
  1451. var observer = scheduler.CreateObserver<int>();
  1452. s.Subscribe(observer);
  1453. Assert.Equal(4, observer.Messages.Count);
  1454. Assert.Equal(1, observer.Messages[0].Value.Value);
  1455. Assert.Equal(2, observer.Messages[1].Value.Value);
  1456. Assert.Equal(3, observer.Messages[2].Value.Value);
  1457. Assert.Equal(NotificationKind.OnError, observer.Messages[3].Value.Kind);
  1458. Assert.Equal(expectedException, observer.Messages[3].Value.Exception);
  1459. }
  1460. [TestMethod]
  1461. public void ReplaySubject_Reentrant()
  1462. {
  1463. var r = new ReplaySubject<int>(4);
  1464. r.OnNext(0);
  1465. r.OnNext(1);
  1466. r.OnNext(2);
  1467. r.OnNext(3);
  1468. r.OnNext(4);
  1469. var xs = new List<int>();
  1470. var i = 0;
  1471. r.Subscribe(x =>
  1472. {
  1473. xs.Add(x);
  1474. if (++i <= 10)
  1475. {
  1476. r.OnNext(x);
  1477. }
  1478. });
  1479. r.OnNext(5);
  1480. Assert.True(xs.SequenceEqual(
  1481. [
  1482. 1, 2, 3, 4, // original
  1483. 1, 2, 3, 4, // reentrant (+ fed back)
  1484. 1, 2, 3, 4, // reentrant (+ first two fed back)
  1485. 1, 2, // reentrant
  1486. 5 // tune in
  1487. ]));
  1488. }
  1489. #if !NO_INTERNALSTEST
  1490. [TestMethod]
  1491. public void FastImmediateObserver_Simple1()
  1492. {
  1493. var res = FastImmediateObserverTest(fio =>
  1494. {
  1495. fio.OnNext(1);
  1496. fio.OnNext(2);
  1497. fio.OnNext(3);
  1498. fio.OnCompleted();
  1499. fio.EnsureActive(4);
  1500. });
  1501. res.AssertEqual(
  1502. OnNext(0, 1),
  1503. OnNext(1, 2),
  1504. OnNext(2, 3),
  1505. OnCompleted<int>(3)
  1506. );
  1507. }
  1508. [TestMethod]
  1509. public void FastImmediateObserver_Simple2()
  1510. {
  1511. var ex = new Exception();
  1512. var res = FastImmediateObserverTest(fio =>
  1513. {
  1514. fio.OnNext(1);
  1515. fio.OnNext(2);
  1516. fio.OnNext(3);
  1517. fio.OnError(ex);
  1518. fio.EnsureActive(4);
  1519. });
  1520. res.AssertEqual(
  1521. OnNext(0, 1),
  1522. OnNext(1, 2),
  1523. OnNext(2, 3),
  1524. OnError<int>(3, ex)
  1525. );
  1526. }
  1527. [TestMethod]
  1528. public void FastImmediateObserver_Simple3()
  1529. {
  1530. var res = FastImmediateObserverTest(fio =>
  1531. {
  1532. fio.OnNext(1);
  1533. fio.EnsureActive();
  1534. fio.OnNext(2);
  1535. fio.EnsureActive();
  1536. fio.OnNext(3);
  1537. fio.EnsureActive();
  1538. fio.OnCompleted();
  1539. fio.EnsureActive();
  1540. });
  1541. res.AssertEqual(
  1542. OnNext(0, 1),
  1543. OnNext(1, 2),
  1544. OnNext(2, 3),
  1545. OnCompleted<int>(3)
  1546. );
  1547. }
  1548. [TestMethod]
  1549. public void FastImmediateObserver_Fault()
  1550. {
  1551. var xs = new List<int>();
  1552. var o = Observer.Create<int>(
  1553. x => { xs.Add(x); if (x == 2) { throw new Exception(); } },
  1554. ex => { },
  1555. () => { }
  1556. );
  1557. var fio = new FastImmediateObserver<int>(o);
  1558. fio.OnNext(1);
  1559. fio.OnNext(2);
  1560. fio.OnNext(3);
  1561. ReactiveAssert.Throws<Exception>(() => fio.EnsureActive());
  1562. fio.OnNext(4);
  1563. fio.EnsureActive();
  1564. fio.OnNext(2);
  1565. fio.EnsureActive();
  1566. Assert.True(xs.Count == 2);
  1567. }
  1568. [TestMethod]
  1569. public void FastImmediateObserver_Ownership1()
  1570. {
  1571. var xs = new List<int>();
  1572. var o = Observer.Create<int>(
  1573. xs.Add,
  1574. ex => { },
  1575. () => { }
  1576. );
  1577. var fio = new FastImmediateObserver<int>(o);
  1578. var ts = new Task[16];
  1579. var N = 100;
  1580. for (var i = 0; i < ts.Length; i++)
  1581. {
  1582. var j = i;
  1583. ts[i] = Task.Factory.StartNew(() =>
  1584. {
  1585. for (var k = 0; k < N; k++)
  1586. {
  1587. fio.OnNext(j * N + k);
  1588. }
  1589. fio.EnsureActive(N);
  1590. });
  1591. }
  1592. Task.WaitAll(ts);
  1593. Assert.True(xs.Count == ts.Length * N);
  1594. }
  1595. [TestMethod]
  1596. public void FastImmediateObserver_Ownership2()
  1597. {
  1598. var cd = new CountdownEvent(3);
  1599. var w = new ManualResetEvent(false);
  1600. var e = new ManualResetEvent(false);
  1601. var xs = new List<int>();
  1602. var o = Observer.Create<int>(
  1603. x => { xs.Add(x); w.Set(); e.WaitOne(); cd.Signal(); },
  1604. ex => { },
  1605. () => { }
  1606. );
  1607. var fio = new FastImmediateObserver<int>(o);
  1608. fio.OnNext(1);
  1609. var t = Task.Factory.StartNew(() =>
  1610. {
  1611. fio.EnsureActive();
  1612. });
  1613. w.WaitOne();
  1614. fio.OnNext(2);
  1615. fio.OnNext(3);
  1616. fio.EnsureActive(2);
  1617. e.Set();
  1618. cd.Wait();
  1619. Assert.True(xs.Count == 3);
  1620. }
  1621. private IEnumerable<Recorded<Notification<int>>> FastImmediateObserverTest(Action<IScheduledObserver<int>> f)
  1622. {
  1623. var ns = new List<Recorded<Notification<int>>>();
  1624. var l = 0L;
  1625. var o = Observer.Create<int>(
  1626. x => { ns.Add(OnNext(l++, x)); },
  1627. ex => { ns.Add(OnError<int>(l++, ex)); },
  1628. () => { ns.Add(OnCompleted<int>(l++)); }
  1629. );
  1630. var fio = new FastImmediateObserver<int>(o);
  1631. f(fio);
  1632. return ns;
  1633. }
  1634. #endif
  1635. }
  1636. }