ReplayTest.cs 36 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. using Assert = Xunit.Assert;
  13. namespace ReactiveTests.Tests
  14. {
  15. [TestClass]
  16. public class ReplayTest : ReactiveTest
  17. {
  18. [TestMethod]
  19. public void Replay_ArgumentChecking()
  20. {
  21. var someObservable = Observable.Empty<int>();
  22. var scheduler = new TestScheduler();
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>)));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int>(null, DummyScheduler.Instance));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(DummyObservable<int>.Instance, null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(null, DummyFunc<IObservable<int>, IObservable<int>>.Instance, DummyScheduler.Instance));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(DummyObservable<int>.Instance, null, DummyScheduler.Instance));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(DummyObservable<int>.Instance, DummyFunc<IObservable<int>, IObservable<int>>.Instance, null));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  32. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1)));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1)));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1)));
  35. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(-1)));
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1), scheduler));
  37. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  38. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(1), default));
  39. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1), scheduler));
  40. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1), scheduler));
  41. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(-1), scheduler));
  42. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(1), default));
  43. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, scheduler));
  44. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, scheduler));
  45. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, default(IScheduler)));
  46. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, scheduler));
  47. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, -2, scheduler));
  48. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, scheduler));
  49. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, default(IScheduler)));
  50. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1));
  51. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2));
  52. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1));
  53. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1));
  54. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2));
  55. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1)));
  56. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1)));
  57. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1)));
  58. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1)));
  59. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1)));
  60. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1)));
  61. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1)));
  62. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1), scheduler));
  63. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1), scheduler));
  64. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1), scheduler));
  65. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(1), null));
  66. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1), scheduler));
  67. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1), scheduler));
  68. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1), scheduler));
  69. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1), scheduler));
  70. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(1), null));
  71. }
  72. [TestMethod]
  73. public void ReplayCount_Basic()
  74. {
  75. var scheduler = new TestScheduler();
  76. var xs = scheduler.CreateHotObservable(
  77. OnNext(110, 7),
  78. OnNext(220, 3),
  79. OnNext(280, 4),
  80. OnNext(290, 1),
  81. OnNext(340, 8),
  82. OnNext(360, 5),
  83. OnNext(370, 6),
  84. OnNext(390, 7),
  85. OnNext(410, 13),
  86. OnNext(430, 2),
  87. OnNext(450, 9),
  88. OnNext(520, 11),
  89. OnNext(560, 20),
  90. OnCompleted<int>(600)
  91. );
  92. var ys = default(IConnectableObservable<int>);
  93. var subscription = default(IDisposable);
  94. var connection = default(IDisposable);
  95. var res = scheduler.CreateObserver<int>();
  96. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  97. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  98. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  99. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  100. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  101. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  102. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  103. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  104. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  105. scheduler.Start();
  106. res.Messages.AssertEqual(
  107. OnNext(451, 5),
  108. OnNext(452, 6),
  109. OnNext(453, 7),
  110. OnNext(521, 11)
  111. );
  112. xs.Subscriptions.AssertEqual(
  113. Subscribe(300, 400),
  114. Subscribe(500, 550),
  115. Subscribe(650, 800)
  116. );
  117. }
  118. [TestMethod]
  119. public void ReplayCount_Error()
  120. {
  121. var scheduler = new TestScheduler();
  122. var ex = new Exception();
  123. var xs = scheduler.CreateHotObservable(
  124. OnNext(110, 7),
  125. OnNext(220, 3),
  126. OnNext(280, 4),
  127. OnNext(290, 1),
  128. OnNext(340, 8),
  129. OnNext(360, 5),
  130. OnNext(370, 6),
  131. OnNext(390, 7),
  132. OnNext(410, 13),
  133. OnNext(430, 2),
  134. OnNext(450, 9),
  135. OnNext(520, 11),
  136. OnNext(560, 20),
  137. OnError<int>(600, ex)
  138. );
  139. var ys = default(IConnectableObservable<int>);
  140. var subscription = default(IDisposable);
  141. var connection = default(IDisposable);
  142. var res = scheduler.CreateObserver<int>();
  143. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  144. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  145. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  146. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  147. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  148. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  149. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  150. scheduler.Start();
  151. res.Messages.AssertEqual(
  152. OnNext(451, 5),
  153. OnNext(452, 6),
  154. OnNext(453, 7),
  155. OnNext(521, 11),
  156. OnNext(561, 20),
  157. OnError<int>(601, ex)
  158. );
  159. xs.Subscriptions.AssertEqual(
  160. Subscribe(300, 400),
  161. Subscribe(500, 600)
  162. );
  163. }
  164. [TestMethod]
  165. public void ReplayCount_Complete()
  166. {
  167. var scheduler = new TestScheduler();
  168. var xs = scheduler.CreateHotObservable(
  169. OnNext(110, 7),
  170. OnNext(220, 3),
  171. OnNext(280, 4),
  172. OnNext(290, 1),
  173. OnNext(340, 8),
  174. OnNext(360, 5),
  175. OnNext(370, 6),
  176. OnNext(390, 7),
  177. OnNext(410, 13),
  178. OnNext(430, 2),
  179. OnNext(450, 9),
  180. OnNext(520, 11),
  181. OnNext(560, 20),
  182. OnCompleted<int>(600)
  183. );
  184. var ys = default(IConnectableObservable<int>);
  185. var subscription = default(IDisposable);
  186. var connection = default(IDisposable);
  187. var res = scheduler.CreateObserver<int>();
  188. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  189. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  190. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  191. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  192. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  193. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  194. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  195. scheduler.Start();
  196. res.Messages.AssertEqual(
  197. OnNext(451, 5),
  198. OnNext(452, 6),
  199. OnNext(453, 7),
  200. OnNext(521, 11),
  201. OnNext(561, 20),
  202. OnCompleted<int>(601)
  203. );
  204. xs.Subscriptions.AssertEqual(
  205. Subscribe(300, 400),
  206. Subscribe(500, 600)
  207. );
  208. }
  209. [TestMethod]
  210. public void ReplayCount_Dispose()
  211. {
  212. var scheduler = new TestScheduler();
  213. var xs = scheduler.CreateHotObservable(
  214. OnNext(110, 7),
  215. OnNext(220, 3),
  216. OnNext(280, 4),
  217. OnNext(290, 1),
  218. OnNext(340, 8),
  219. OnNext(360, 5),
  220. OnNext(370, 6),
  221. OnNext(390, 7),
  222. OnNext(410, 13),
  223. OnNext(430, 2),
  224. OnNext(450, 9),
  225. OnNext(520, 11),
  226. OnNext(560, 20),
  227. OnCompleted<int>(600)
  228. );
  229. var ys = default(IConnectableObservable<int>);
  230. var subscription = default(IDisposable);
  231. var connection = default(IDisposable);
  232. var res = scheduler.CreateObserver<int>();
  233. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  234. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  235. scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
  236. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  237. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  238. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  239. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  240. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  241. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  242. scheduler.Start();
  243. res.Messages.AssertEqual(
  244. OnNext(451, 5),
  245. OnNext(452, 6),
  246. OnNext(453, 7)
  247. );
  248. xs.Subscriptions.AssertEqual(
  249. Subscribe(300, 400),
  250. Subscribe(500, 550),
  251. Subscribe(650, 800)
  252. );
  253. }
  254. [TestMethod]
  255. public void ReplayCount_MultipleConnections()
  256. {
  257. var xs = Observable.Never<int>();
  258. var ys = xs.Replay(3, new TestScheduler());
  259. var connection1 = ys.Connect();
  260. var connection2 = ys.Connect();
  261. Assert.Same(connection1, connection2);
  262. connection1.Dispose();
  263. connection2.Dispose();
  264. var connection3 = ys.Connect();
  265. Assert.NotSame(connection1, connection3);
  266. connection3.Dispose();
  267. }
  268. [TestMethod]
  269. public void ReplayCountLambda_Zip_Complete()
  270. {
  271. var scheduler = new TestScheduler();
  272. var xs = scheduler.CreateHotObservable(
  273. OnNext(110, 7),
  274. OnNext(220, 3),
  275. OnNext(280, 4),
  276. OnNext(290, 1),
  277. OnNext(340, 8),
  278. OnNext(360, 5),
  279. OnNext(370, 6),
  280. OnNext(390, 7),
  281. OnNext(410, 13),
  282. OnNext(430, 2),
  283. OnNext(450, 9),
  284. OnNext(520, 11),
  285. OnNext(560, 20),
  286. OnCompleted<int>(600)
  287. );
  288. var res = scheduler.Start(() =>
  289. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
  290. 610
  291. );
  292. res.Messages.AssertEqual(
  293. OnNext(221, 3),
  294. OnNext(281, 4),
  295. OnNext(291, 1),
  296. OnNext(341, 8),
  297. OnNext(361, 5),
  298. OnNext(371, 6),
  299. OnNext(372, 8),
  300. OnNext(373, 5),
  301. OnNext(374, 6),
  302. OnNext(391, 7),
  303. OnNext(411, 13),
  304. OnNext(431, 2),
  305. OnNext(432, 7),
  306. OnNext(433, 13),
  307. OnNext(434, 2),
  308. OnNext(451, 9),
  309. OnNext(521, 11),
  310. OnNext(561, 20),
  311. OnNext(562, 9),
  312. OnNext(563, 11),
  313. OnNext(564, 20),
  314. OnNext(602, 9),
  315. OnNext(603, 11),
  316. OnNext(604, 20),
  317. OnNext(606, 9),
  318. OnNext(607, 11),
  319. OnNext(608, 20)
  320. );
  321. xs.Subscriptions.AssertEqual(
  322. Subscribe(200, 600)
  323. );
  324. }
  325. [TestMethod]
  326. public void ReplayCountLambda_Zip_Error()
  327. {
  328. var scheduler = new TestScheduler();
  329. var ex = new Exception();
  330. var xs = scheduler.CreateHotObservable(
  331. OnNext(110, 7),
  332. OnNext(220, 3),
  333. OnNext(280, 4),
  334. OnNext(290, 1),
  335. OnNext(340, 8),
  336. OnNext(360, 5),
  337. OnNext(370, 6),
  338. OnNext(390, 7),
  339. OnNext(410, 13),
  340. OnNext(430, 2),
  341. OnNext(450, 9),
  342. OnNext(520, 11),
  343. OnNext(560, 20),
  344. OnError<int>(600, ex)
  345. );
  346. var res = scheduler.Start(() =>
  347. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler)
  348. );
  349. res.Messages.AssertEqual(
  350. OnNext(221, 3),
  351. OnNext(281, 4),
  352. OnNext(291, 1),
  353. OnNext(341, 8),
  354. OnNext(361, 5),
  355. OnNext(371, 6),
  356. OnNext(372, 8),
  357. OnNext(373, 5),
  358. OnNext(374, 6),
  359. OnNext(391, 7),
  360. OnNext(411, 13),
  361. OnNext(431, 2),
  362. OnNext(432, 7),
  363. OnNext(433, 13),
  364. OnNext(434, 2),
  365. OnNext(451, 9),
  366. OnNext(521, 11),
  367. OnNext(561, 20),
  368. OnNext(562, 9),
  369. OnNext(563, 11),
  370. OnNext(564, 20),
  371. OnError<int>(601, ex)
  372. );
  373. xs.Subscriptions.AssertEqual(
  374. Subscribe(200, 600)
  375. );
  376. }
  377. [TestMethod]
  378. public void ReplayCountLambda_Zip_Dispose()
  379. {
  380. var scheduler = new TestScheduler();
  381. var xs = scheduler.CreateHotObservable(
  382. OnNext(110, 7),
  383. OnNext(220, 3),
  384. OnNext(280, 4),
  385. OnNext(290, 1),
  386. OnNext(340, 8),
  387. OnNext(360, 5),
  388. OnNext(370, 6),
  389. OnNext(390, 7),
  390. OnNext(410, 13),
  391. OnNext(430, 2),
  392. OnNext(450, 9),
  393. OnNext(520, 11),
  394. OnNext(560, 20),
  395. OnCompleted<int>(600)
  396. );
  397. var res = scheduler.Start(() =>
  398. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
  399. 470
  400. );
  401. res.Messages.AssertEqual(
  402. OnNext(221, 3),
  403. OnNext(281, 4),
  404. OnNext(291, 1),
  405. OnNext(341, 8),
  406. OnNext(361, 5),
  407. OnNext(371, 6),
  408. OnNext(372, 8),
  409. OnNext(373, 5),
  410. OnNext(374, 6),
  411. OnNext(391, 7),
  412. OnNext(411, 13),
  413. OnNext(431, 2),
  414. OnNext(432, 7),
  415. OnNext(433, 13),
  416. OnNext(434, 2),
  417. OnNext(451, 9)
  418. );
  419. xs.Subscriptions.AssertEqual(
  420. Subscribe(200, 470)
  421. );
  422. }
  423. [TestMethod]
  424. public void ReplayTime_Basic()
  425. {
  426. var scheduler = new TestScheduler();
  427. var xs = scheduler.CreateHotObservable(
  428. OnNext(110, 7),
  429. OnNext(220, 3),
  430. OnNext(280, 4),
  431. OnNext(290, 1),
  432. OnNext(340, 8),
  433. OnNext(360, 5),
  434. OnNext(370, 6),
  435. OnNext(390, 7),
  436. OnNext(410, 13),
  437. OnNext(430, 2),
  438. OnNext(450, 9),
  439. OnNext(520, 11),
  440. OnNext(560, 20),
  441. OnCompleted<int>(600)
  442. );
  443. var ys = default(IConnectableObservable<int>);
  444. var subscription = default(IDisposable);
  445. var connection = default(IDisposable);
  446. var res = scheduler.CreateObserver<int>();
  447. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(150), scheduler));
  448. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  449. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  450. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  451. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  452. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  453. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  454. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  455. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  456. scheduler.Start();
  457. res.Messages.AssertEqual(
  458. OnNext(451, 8),
  459. OnNext(452, 5),
  460. OnNext(453, 6),
  461. OnNext(454, 7),
  462. OnNext(521, 11)
  463. );
  464. xs.Subscriptions.AssertEqual(
  465. Subscribe(300, 400),
  466. Subscribe(500, 550),
  467. Subscribe(650, 800)
  468. );
  469. }
  470. [TestMethod]
  471. public void ReplayTime_Error()
  472. {
  473. var scheduler = new TestScheduler();
  474. var ex = new Exception();
  475. var xs = scheduler.CreateHotObservable(
  476. OnNext(110, 7),
  477. OnNext(220, 3),
  478. OnNext(280, 4),
  479. OnNext(290, 1),
  480. OnNext(340, 8),
  481. OnNext(360, 5),
  482. OnNext(370, 6),
  483. OnNext(390, 7),
  484. OnNext(410, 13),
  485. OnNext(430, 2),
  486. OnNext(450, 9),
  487. OnNext(520, 11),
  488. OnNext(560, 20),
  489. OnError<int>(600, ex)
  490. );
  491. var ys = default(IConnectableObservable<int>);
  492. var subscription = default(IDisposable);
  493. var connection = default(IDisposable);
  494. var res = scheduler.CreateObserver<int>();
  495. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(75), scheduler));
  496. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  497. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  498. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  499. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  500. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  501. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  502. scheduler.Start();
  503. res.Messages.AssertEqual(
  504. OnNext(451, 7),
  505. OnNext(521, 11),
  506. OnNext(561, 20),
  507. OnError<int>(601, ex)
  508. );
  509. xs.Subscriptions.AssertEqual(
  510. Subscribe(300, 400),
  511. Subscribe(500, 600)
  512. );
  513. }
  514. [TestMethod]
  515. public void ReplayTime_Complete()
  516. {
  517. var scheduler = new TestScheduler();
  518. var xs = scheduler.CreateHotObservable(
  519. OnNext(110, 7),
  520. OnNext(220, 3),
  521. OnNext(280, 4),
  522. OnNext(290, 1),
  523. OnNext(340, 8),
  524. OnNext(360, 5),
  525. OnNext(370, 6),
  526. OnNext(390, 7),
  527. OnNext(410, 13),
  528. OnNext(430, 2),
  529. OnNext(450, 9),
  530. OnNext(520, 11),
  531. OnNext(560, 20),
  532. OnCompleted<int>(600)
  533. );
  534. var ys = default(IConnectableObservable<int>);
  535. var subscription = default(IDisposable);
  536. var connection = default(IDisposable);
  537. var res = scheduler.CreateObserver<int>();
  538. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(85), scheduler));
  539. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  540. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  541. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  542. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  543. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  544. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  545. scheduler.Start();
  546. res.Messages.AssertEqual(
  547. OnNext(451, 6),
  548. OnNext(452, 7),
  549. OnNext(521, 11),
  550. OnNext(561, 20),
  551. OnCompleted<int>(601)
  552. );
  553. xs.Subscriptions.AssertEqual(
  554. Subscribe(300, 400),
  555. Subscribe(500, 600)
  556. );
  557. }
  558. [TestMethod]
  559. public void ReplayTime_Dispose()
  560. {
  561. var scheduler = new TestScheduler();
  562. var xs = scheduler.CreateHotObservable(
  563. OnNext(110, 7),
  564. OnNext(220, 3),
  565. OnNext(280, 4),
  566. OnNext(290, 1),
  567. OnNext(340, 8),
  568. OnNext(360, 5),
  569. OnNext(370, 6),
  570. OnNext(390, 7),
  571. OnNext(410, 13),
  572. OnNext(430, 2),
  573. OnNext(450, 9),
  574. OnNext(520, 11),
  575. OnNext(560, 20),
  576. OnCompleted<int>(600)
  577. );
  578. var ys = default(IConnectableObservable<int>);
  579. var subscription = default(IDisposable);
  580. var connection = default(IDisposable);
  581. var res = scheduler.CreateObserver<int>();
  582. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(100), scheduler));
  583. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  584. scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
  585. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  586. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  587. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  588. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  589. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  590. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  591. scheduler.Start();
  592. res.Messages.AssertEqual(
  593. OnNext(451, 5),
  594. OnNext(452, 6),
  595. OnNext(453, 7)
  596. );
  597. xs.Subscriptions.AssertEqual(
  598. Subscribe(300, 400),
  599. Subscribe(500, 550),
  600. Subscribe(650, 800)
  601. );
  602. }
  603. [TestMethod]
  604. public void ReplayTime_MultipleConnections()
  605. {
  606. var xs = Observable.Never<int>();
  607. var ys = xs.Replay(TimeSpan.FromTicks(100), new TestScheduler());
  608. var connection1 = ys.Connect();
  609. var connection2 = ys.Connect();
  610. Assert.Same(connection1, connection2);
  611. connection1.Dispose();
  612. connection2.Dispose();
  613. var connection3 = ys.Connect();
  614. Assert.NotSame(connection1, connection3);
  615. connection3.Dispose();
  616. }
  617. [TestMethod]
  618. public void ReplayTimeLambda_Zip_Complete()
  619. {
  620. var scheduler = new TestScheduler();
  621. var xs = scheduler.CreateHotObservable(
  622. OnNext(110, 7),
  623. OnNext(220, 3),
  624. OnNext(280, 4),
  625. OnNext(290, 1),
  626. OnNext(340, 8),
  627. OnNext(360, 5),
  628. OnNext(370, 6),
  629. OnNext(390, 7),
  630. OnNext(410, 13),
  631. OnNext(430, 2),
  632. OnNext(450, 9),
  633. OnNext(520, 11),
  634. OnNext(560, 20),
  635. OnCompleted<int>(600)
  636. );
  637. var res = scheduler.Start(() =>
  638. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
  639. 610
  640. );
  641. res.Messages.AssertEqual(
  642. OnNext(221, 3),
  643. OnNext(281, 4),
  644. OnNext(291, 1),
  645. OnNext(341, 8),
  646. OnNext(361, 5),
  647. OnNext(371, 6),
  648. OnNext(372, 8),
  649. OnNext(373, 5),
  650. OnNext(374, 6),
  651. OnNext(391, 7),
  652. OnNext(411, 13),
  653. OnNext(431, 2),
  654. OnNext(432, 7),
  655. OnNext(433, 13),
  656. OnNext(434, 2),
  657. OnNext(451, 9),
  658. OnNext(521, 11),
  659. OnNext(561, 20),
  660. OnNext(562, 11),
  661. OnNext(563, 20),
  662. OnNext(602, 20),
  663. OnNext(604, 20),
  664. OnNext(606, 20),
  665. OnNext(608, 20)
  666. );
  667. xs.Subscriptions.AssertEqual(
  668. Subscribe(200, 600)
  669. );
  670. }
  671. [TestMethod]
  672. public void ReplayTimeLambda_Zip_Error()
  673. {
  674. var scheduler = new TestScheduler();
  675. var ex = new Exception();
  676. var xs = scheduler.CreateHotObservable(
  677. OnNext(110, 7),
  678. OnNext(220, 3),
  679. OnNext(280, 4),
  680. OnNext(290, 1),
  681. OnNext(340, 8),
  682. OnNext(360, 5),
  683. OnNext(370, 6),
  684. OnNext(390, 7),
  685. OnNext(410, 13),
  686. OnNext(430, 2),
  687. OnNext(450, 9),
  688. OnNext(520, 11),
  689. OnNext(560, 20),
  690. OnError<int>(600, ex)
  691. );
  692. var res = scheduler.Start(() =>
  693. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler)
  694. );
  695. res.Messages.AssertEqual(
  696. OnNext(221, 3),
  697. OnNext(281, 4),
  698. OnNext(291, 1),
  699. OnNext(341, 8),
  700. OnNext(361, 5),
  701. OnNext(371, 6),
  702. OnNext(372, 8),
  703. OnNext(373, 5),
  704. OnNext(374, 6),
  705. OnNext(391, 7),
  706. OnNext(411, 13),
  707. OnNext(431, 2),
  708. OnNext(432, 7),
  709. OnNext(433, 13),
  710. OnNext(434, 2),
  711. OnNext(451, 9),
  712. OnNext(521, 11),
  713. OnNext(561, 20),
  714. OnNext(562, 11),
  715. OnNext(563, 20),
  716. OnError<int>(601, ex)
  717. );
  718. xs.Subscriptions.AssertEqual(
  719. Subscribe(200, 600)
  720. );
  721. }
  722. [TestMethod]
  723. public void ReplayTimeLambda_Zip_Dispose()
  724. {
  725. var scheduler = new TestScheduler();
  726. var xs = scheduler.CreateHotObservable(
  727. OnNext(110, 7),
  728. OnNext(220, 3),
  729. OnNext(280, 4),
  730. OnNext(290, 1),
  731. OnNext(340, 8),
  732. OnNext(360, 5),
  733. OnNext(370, 6),
  734. OnNext(390, 7),
  735. OnNext(410, 13),
  736. OnNext(430, 2),
  737. OnNext(450, 9),
  738. OnNext(520, 11),
  739. OnNext(560, 20),
  740. OnCompleted<int>(600)
  741. );
  742. var res = scheduler.Start(() =>
  743. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
  744. 470
  745. );
  746. res.Messages.AssertEqual(
  747. OnNext(221, 3),
  748. OnNext(281, 4),
  749. OnNext(291, 1),
  750. OnNext(341, 8),
  751. OnNext(361, 5),
  752. OnNext(371, 6),
  753. OnNext(372, 8),
  754. OnNext(373, 5),
  755. OnNext(374, 6),
  756. OnNext(391, 7),
  757. OnNext(411, 13),
  758. OnNext(431, 2),
  759. OnNext(432, 7),
  760. OnNext(433, 13),
  761. OnNext(434, 2),
  762. OnNext(451, 9)
  763. );
  764. xs.Subscriptions.AssertEqual(
  765. Subscribe(200, 470)
  766. );
  767. }
  768. [TestMethod]
  769. public void Replay_Default1()
  770. {
  771. var s = new Subject<int>();
  772. var xs = s.Replay(100, DefaultScheduler.Instance);
  773. var ys = s.Replay(100);
  774. xs.Connect();
  775. ys.Connect();
  776. s.OnNext(1);
  777. s.OnNext(2);
  778. s.OnCompleted();
  779. xs.AssertEqual(ys);
  780. }
  781. [TestMethod]
  782. public void Replay_Default2()
  783. {
  784. var s = new Subject<int>();
  785. var xs = s.Replay(TimeSpan.FromHours(1), DefaultScheduler.Instance);
  786. var ys = s.Replay(TimeSpan.FromHours(1));
  787. xs.Connect();
  788. ys.Connect();
  789. s.OnNext(1);
  790. s.OnNext(2);
  791. s.OnCompleted();
  792. xs.AssertEqual(ys);
  793. }
  794. [TestMethod]
  795. public void Replay_Default3()
  796. {
  797. var s = new Subject<int>();
  798. var xs = s.Replay(100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  799. var ys = s.Replay(100, TimeSpan.FromHours(1));
  800. xs.Connect();
  801. ys.Connect();
  802. s.OnNext(1);
  803. s.OnNext(2);
  804. s.OnCompleted();
  805. xs.AssertEqual(ys);
  806. }
  807. [TestMethod]
  808. public void Replay_Default4()
  809. {
  810. var s = new Subject<int>();
  811. var xs = s.Replay(DefaultScheduler.Instance);
  812. var ys = s.Replay();
  813. xs.Connect();
  814. ys.Connect();
  815. s.OnNext(1);
  816. s.OnNext(2);
  817. s.OnCompleted();
  818. xs.AssertEqual(ys);
  819. }
  820. [TestMethod]
  821. public void ReplayLambda_Default1()
  822. {
  823. var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, DefaultScheduler.Instance);
  824. var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100);
  825. xs.AssertEqual(ys);
  826. }
  827. [TestMethod]
  828. public void ReplayLambda_Default2()
  829. {
  830. var xs = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  831. var ys = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1));
  832. xs.AssertEqual(ys);
  833. }
  834. [TestMethod]
  835. public void ReplayLambda_Default3()
  836. {
  837. var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  838. var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1));
  839. xs.AssertEqual(ys);
  840. }
  841. [TestMethod]
  842. public void ReplayLambda_Default4()
  843. {
  844. var xs = Observable.Range(1, 10).Replay(_xs => _xs, DefaultScheduler.Instance);
  845. var ys = Observable.Range(1, 10).Replay(_xs => _xs);
  846. xs.AssertEqual(ys);
  847. }
  848. }
  849. }