1
0

ReplayTest.cs 36 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Reactive.Subjects;
  17. namespace ReactiveTests.Tests
  18. {
  19. public class ReplayTest : ReactiveTest
  20. {
  21. [Fact]
  22. public void Replay_ArgumentChecking()
  23. {
  24. var someObservable = Observable.Empty<int>();
  25. var scheduler = new TestScheduler();
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>)));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int>(null, DummyScheduler.Instance));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int>(DummyObservable<int>.Instance, (IScheduler)null));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(null, DummyFunc<IObservable<int>, IObservable<int>>.Instance, DummyScheduler.Instance));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(DummyObservable<int>.Instance, null, DummyScheduler.Instance));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(DummyObservable<int>.Instance, DummyFunc<IObservable<int>, IObservable<int>>.Instance, null));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  35. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1)));
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1)));
  37. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1)));
  38. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay<int, int>(someObservable, x => x, TimeSpan.FromSeconds(-1)));
  39. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1), scheduler));
  40. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  41. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(1), default(IScheduler)));
  42. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1), scheduler));
  43. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1), scheduler));
  44. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(-1), scheduler));
  45. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(1), default(IScheduler)));
  46. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, scheduler));
  47. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, scheduler));
  48. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, default(IScheduler)));
  49. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, scheduler));
  50. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, -2, scheduler));
  51. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, scheduler));
  52. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, default(IScheduler)));
  53. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1));
  54. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2));
  55. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1));
  56. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1));
  57. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2));
  58. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1)));
  59. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1)));
  60. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1)));
  61. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1)));
  62. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1)));
  63. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1)));
  64. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1)));
  65. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1), scheduler));
  66. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1), scheduler));
  67. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1), scheduler));
  68. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(1), null));
  69. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1), scheduler));
  70. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1), scheduler));
  71. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1), scheduler));
  72. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1), scheduler));
  73. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(1), null));
  74. }
  75. [Fact]
  76. public void ReplayCount_Basic()
  77. {
  78. var scheduler = new TestScheduler();
  79. var xs = scheduler.CreateHotObservable(
  80. OnNext(110, 7),
  81. OnNext(220, 3),
  82. OnNext(280, 4),
  83. OnNext(290, 1),
  84. OnNext(340, 8),
  85. OnNext(360, 5),
  86. OnNext(370, 6),
  87. OnNext(390, 7),
  88. OnNext(410, 13),
  89. OnNext(430, 2),
  90. OnNext(450, 9),
  91. OnNext(520, 11),
  92. OnNext(560, 20),
  93. OnCompleted<int>(600)
  94. );
  95. var ys = default(IConnectableObservable<int>);
  96. var subscription = default(IDisposable);
  97. var connection = default(IDisposable);
  98. var res = scheduler.CreateObserver<int>();
  99. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  100. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  101. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  102. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  103. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  104. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  105. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  106. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  107. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  108. scheduler.Start();
  109. res.Messages.AssertEqual(
  110. OnNext(451, 5),
  111. OnNext(452, 6),
  112. OnNext(453, 7),
  113. OnNext(521, 11)
  114. );
  115. xs.Subscriptions.AssertEqual(
  116. Subscribe(300, 400),
  117. Subscribe(500, 550),
  118. Subscribe(650, 800)
  119. );
  120. }
  121. [Fact]
  122. public void ReplayCount_Error()
  123. {
  124. var scheduler = new TestScheduler();
  125. var ex = new Exception();
  126. var xs = scheduler.CreateHotObservable(
  127. OnNext(110, 7),
  128. OnNext(220, 3),
  129. OnNext(280, 4),
  130. OnNext(290, 1),
  131. OnNext(340, 8),
  132. OnNext(360, 5),
  133. OnNext(370, 6),
  134. OnNext(390, 7),
  135. OnNext(410, 13),
  136. OnNext(430, 2),
  137. OnNext(450, 9),
  138. OnNext(520, 11),
  139. OnNext(560, 20),
  140. OnError<int>(600, ex)
  141. );
  142. var ys = default(IConnectableObservable<int>);
  143. var subscription = default(IDisposable);
  144. var connection = default(IDisposable);
  145. var res = scheduler.CreateObserver<int>();
  146. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  147. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  148. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  149. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  150. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  151. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  152. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  153. scheduler.Start();
  154. res.Messages.AssertEqual(
  155. OnNext(451, 5),
  156. OnNext(452, 6),
  157. OnNext(453, 7),
  158. OnNext(521, 11),
  159. OnNext(561, 20),
  160. OnError<int>(601, ex)
  161. );
  162. xs.Subscriptions.AssertEqual(
  163. Subscribe(300, 400),
  164. Subscribe(500, 600)
  165. );
  166. }
  167. [Fact]
  168. public void ReplayCount_Complete()
  169. {
  170. var scheduler = new TestScheduler();
  171. var xs = scheduler.CreateHotObservable(
  172. OnNext(110, 7),
  173. OnNext(220, 3),
  174. OnNext(280, 4),
  175. OnNext(290, 1),
  176. OnNext(340, 8),
  177. OnNext(360, 5),
  178. OnNext(370, 6),
  179. OnNext(390, 7),
  180. OnNext(410, 13),
  181. OnNext(430, 2),
  182. OnNext(450, 9),
  183. OnNext(520, 11),
  184. OnNext(560, 20),
  185. OnCompleted<int>(600)
  186. );
  187. var ys = default(IConnectableObservable<int>);
  188. var subscription = default(IDisposable);
  189. var connection = default(IDisposable);
  190. var res = scheduler.CreateObserver<int>();
  191. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  192. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  193. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  194. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  195. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  196. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  197. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  198. scheduler.Start();
  199. res.Messages.AssertEqual(
  200. OnNext(451, 5),
  201. OnNext(452, 6),
  202. OnNext(453, 7),
  203. OnNext(521, 11),
  204. OnNext(561, 20),
  205. OnCompleted<int>(601)
  206. );
  207. xs.Subscriptions.AssertEqual(
  208. Subscribe(300, 400),
  209. Subscribe(500, 600)
  210. );
  211. }
  212. [Fact]
  213. public void ReplayCount_Dispose()
  214. {
  215. var scheduler = new TestScheduler();
  216. var xs = scheduler.CreateHotObservable(
  217. OnNext(110, 7),
  218. OnNext(220, 3),
  219. OnNext(280, 4),
  220. OnNext(290, 1),
  221. OnNext(340, 8),
  222. OnNext(360, 5),
  223. OnNext(370, 6),
  224. OnNext(390, 7),
  225. OnNext(410, 13),
  226. OnNext(430, 2),
  227. OnNext(450, 9),
  228. OnNext(520, 11),
  229. OnNext(560, 20),
  230. OnCompleted<int>(600)
  231. );
  232. var ys = default(IConnectableObservable<int>);
  233. var subscription = default(IDisposable);
  234. var connection = default(IDisposable);
  235. var res = scheduler.CreateObserver<int>();
  236. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  237. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  238. scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
  239. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  240. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  241. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  242. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  243. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  244. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  245. scheduler.Start();
  246. res.Messages.AssertEqual(
  247. OnNext(451, 5),
  248. OnNext(452, 6),
  249. OnNext(453, 7)
  250. );
  251. xs.Subscriptions.AssertEqual(
  252. Subscribe(300, 400),
  253. Subscribe(500, 550),
  254. Subscribe(650, 800)
  255. );
  256. }
  257. [Fact]
  258. public void ReplayCount_MultipleConnections()
  259. {
  260. var xs = Observable.Never<int>();
  261. var ys = xs.Replay(3, new TestScheduler());
  262. var connection1 = ys.Connect();
  263. var connection2 = ys.Connect();
  264. Assert.Same(connection1, connection2);
  265. connection1.Dispose();
  266. connection2.Dispose();
  267. var connection3 = ys.Connect();
  268. Assert.NotSame(connection1, connection3);
  269. connection3.Dispose();
  270. }
  271. [Fact]
  272. public void ReplayCountLambda_Zip_Complete()
  273. {
  274. var scheduler = new TestScheduler();
  275. var xs = scheduler.CreateHotObservable(
  276. OnNext(110, 7),
  277. OnNext(220, 3),
  278. OnNext(280, 4),
  279. OnNext(290, 1),
  280. OnNext(340, 8),
  281. OnNext(360, 5),
  282. OnNext(370, 6),
  283. OnNext(390, 7),
  284. OnNext(410, 13),
  285. OnNext(430, 2),
  286. OnNext(450, 9),
  287. OnNext(520, 11),
  288. OnNext(560, 20),
  289. OnCompleted<int>(600)
  290. );
  291. var res = scheduler.Start(() =>
  292. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
  293. 610
  294. );
  295. res.Messages.AssertEqual(
  296. OnNext(221, 3),
  297. OnNext(281, 4),
  298. OnNext(291, 1),
  299. OnNext(341, 8),
  300. OnNext(361, 5),
  301. OnNext(371, 6),
  302. OnNext(372, 8),
  303. OnNext(373, 5),
  304. OnNext(374, 6),
  305. OnNext(391, 7),
  306. OnNext(411, 13),
  307. OnNext(431, 2),
  308. OnNext(432, 7),
  309. OnNext(433, 13),
  310. OnNext(434, 2),
  311. OnNext(451, 9),
  312. OnNext(521, 11),
  313. OnNext(561, 20),
  314. OnNext(562, 9),
  315. OnNext(563, 11),
  316. OnNext(564, 20),
  317. OnNext(602, 9),
  318. OnNext(603, 11),
  319. OnNext(604, 20),
  320. OnNext(606, 9),
  321. OnNext(607, 11),
  322. OnNext(608, 20)
  323. );
  324. xs.Subscriptions.AssertEqual(
  325. Subscribe(200, 600)
  326. );
  327. }
  328. [Fact]
  329. public void ReplayCountLambda_Zip_Error()
  330. {
  331. var scheduler = new TestScheduler();
  332. var ex = new Exception();
  333. var xs = scheduler.CreateHotObservable(
  334. OnNext(110, 7),
  335. OnNext(220, 3),
  336. OnNext(280, 4),
  337. OnNext(290, 1),
  338. OnNext(340, 8),
  339. OnNext(360, 5),
  340. OnNext(370, 6),
  341. OnNext(390, 7),
  342. OnNext(410, 13),
  343. OnNext(430, 2),
  344. OnNext(450, 9),
  345. OnNext(520, 11),
  346. OnNext(560, 20),
  347. OnError<int>(600, ex)
  348. );
  349. var res = scheduler.Start(() =>
  350. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler)
  351. );
  352. res.Messages.AssertEqual(
  353. OnNext(221, 3),
  354. OnNext(281, 4),
  355. OnNext(291, 1),
  356. OnNext(341, 8),
  357. OnNext(361, 5),
  358. OnNext(371, 6),
  359. OnNext(372, 8),
  360. OnNext(373, 5),
  361. OnNext(374, 6),
  362. OnNext(391, 7),
  363. OnNext(411, 13),
  364. OnNext(431, 2),
  365. OnNext(432, 7),
  366. OnNext(433, 13),
  367. OnNext(434, 2),
  368. OnNext(451, 9),
  369. OnNext(521, 11),
  370. OnNext(561, 20),
  371. OnNext(562, 9),
  372. OnNext(563, 11),
  373. OnNext(564, 20),
  374. OnError<int>(601, ex)
  375. );
  376. xs.Subscriptions.AssertEqual(
  377. Subscribe(200, 600)
  378. );
  379. }
  380. [Fact]
  381. public void ReplayCountLambda_Zip_Dispose()
  382. {
  383. var scheduler = new TestScheduler();
  384. var xs = scheduler.CreateHotObservable(
  385. OnNext(110, 7),
  386. OnNext(220, 3),
  387. OnNext(280, 4),
  388. OnNext(290, 1),
  389. OnNext(340, 8),
  390. OnNext(360, 5),
  391. OnNext(370, 6),
  392. OnNext(390, 7),
  393. OnNext(410, 13),
  394. OnNext(430, 2),
  395. OnNext(450, 9),
  396. OnNext(520, 11),
  397. OnNext(560, 20),
  398. OnCompleted<int>(600)
  399. );
  400. var res = scheduler.Start(() =>
  401. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
  402. 470
  403. );
  404. res.Messages.AssertEqual(
  405. OnNext(221, 3),
  406. OnNext(281, 4),
  407. OnNext(291, 1),
  408. OnNext(341, 8),
  409. OnNext(361, 5),
  410. OnNext(371, 6),
  411. OnNext(372, 8),
  412. OnNext(373, 5),
  413. OnNext(374, 6),
  414. OnNext(391, 7),
  415. OnNext(411, 13),
  416. OnNext(431, 2),
  417. OnNext(432, 7),
  418. OnNext(433, 13),
  419. OnNext(434, 2),
  420. OnNext(451, 9)
  421. );
  422. xs.Subscriptions.AssertEqual(
  423. Subscribe(200, 470)
  424. );
  425. }
  426. [Fact]
  427. public void ReplayTime_Basic()
  428. {
  429. var scheduler = new TestScheduler();
  430. var xs = scheduler.CreateHotObservable(
  431. OnNext(110, 7),
  432. OnNext(220, 3),
  433. OnNext(280, 4),
  434. OnNext(290, 1),
  435. OnNext(340, 8),
  436. OnNext(360, 5),
  437. OnNext(370, 6),
  438. OnNext(390, 7),
  439. OnNext(410, 13),
  440. OnNext(430, 2),
  441. OnNext(450, 9),
  442. OnNext(520, 11),
  443. OnNext(560, 20),
  444. OnCompleted<int>(600)
  445. );
  446. var ys = default(IConnectableObservable<int>);
  447. var subscription = default(IDisposable);
  448. var connection = default(IDisposable);
  449. var res = scheduler.CreateObserver<int>();
  450. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(150), scheduler));
  451. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  452. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  453. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  454. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  455. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  456. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  457. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  458. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  459. scheduler.Start();
  460. res.Messages.AssertEqual(
  461. OnNext(451, 8),
  462. OnNext(452, 5),
  463. OnNext(453, 6),
  464. OnNext(454, 7),
  465. OnNext(521, 11)
  466. );
  467. xs.Subscriptions.AssertEqual(
  468. Subscribe(300, 400),
  469. Subscribe(500, 550),
  470. Subscribe(650, 800)
  471. );
  472. }
  473. [Fact]
  474. public void ReplayTime_Error()
  475. {
  476. var scheduler = new TestScheduler();
  477. var ex = new Exception();
  478. var xs = scheduler.CreateHotObservable(
  479. OnNext(110, 7),
  480. OnNext(220, 3),
  481. OnNext(280, 4),
  482. OnNext(290, 1),
  483. OnNext(340, 8),
  484. OnNext(360, 5),
  485. OnNext(370, 6),
  486. OnNext(390, 7),
  487. OnNext(410, 13),
  488. OnNext(430, 2),
  489. OnNext(450, 9),
  490. OnNext(520, 11),
  491. OnNext(560, 20),
  492. OnError<int>(600, ex)
  493. );
  494. var ys = default(IConnectableObservable<int>);
  495. var subscription = default(IDisposable);
  496. var connection = default(IDisposable);
  497. var res = scheduler.CreateObserver<int>();
  498. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(75), scheduler));
  499. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  500. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  501. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  502. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  503. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  504. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  505. scheduler.Start();
  506. res.Messages.AssertEqual(
  507. OnNext(451, 7),
  508. OnNext(521, 11),
  509. OnNext(561, 20),
  510. OnError<int>(601, ex)
  511. );
  512. xs.Subscriptions.AssertEqual(
  513. Subscribe(300, 400),
  514. Subscribe(500, 600)
  515. );
  516. }
  517. [Fact]
  518. public void ReplayTime_Complete()
  519. {
  520. var scheduler = new TestScheduler();
  521. var xs = scheduler.CreateHotObservable(
  522. OnNext(110, 7),
  523. OnNext(220, 3),
  524. OnNext(280, 4),
  525. OnNext(290, 1),
  526. OnNext(340, 8),
  527. OnNext(360, 5),
  528. OnNext(370, 6),
  529. OnNext(390, 7),
  530. OnNext(410, 13),
  531. OnNext(430, 2),
  532. OnNext(450, 9),
  533. OnNext(520, 11),
  534. OnNext(560, 20),
  535. OnCompleted<int>(600)
  536. );
  537. var ys = default(IConnectableObservable<int>);
  538. var subscription = default(IDisposable);
  539. var connection = default(IDisposable);
  540. var res = scheduler.CreateObserver<int>();
  541. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(85), scheduler));
  542. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  543. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  544. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  545. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  546. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  547. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  548. scheduler.Start();
  549. res.Messages.AssertEqual(
  550. OnNext(451, 6),
  551. OnNext(452, 7),
  552. OnNext(521, 11),
  553. OnNext(561, 20),
  554. OnCompleted<int>(601)
  555. );
  556. xs.Subscriptions.AssertEqual(
  557. Subscribe(300, 400),
  558. Subscribe(500, 600)
  559. );
  560. }
  561. [Fact]
  562. public void ReplayTime_Dispose()
  563. {
  564. var scheduler = new TestScheduler();
  565. var xs = scheduler.CreateHotObservable(
  566. OnNext(110, 7),
  567. OnNext(220, 3),
  568. OnNext(280, 4),
  569. OnNext(290, 1),
  570. OnNext(340, 8),
  571. OnNext(360, 5),
  572. OnNext(370, 6),
  573. OnNext(390, 7),
  574. OnNext(410, 13),
  575. OnNext(430, 2),
  576. OnNext(450, 9),
  577. OnNext(520, 11),
  578. OnNext(560, 20),
  579. OnCompleted<int>(600)
  580. );
  581. var ys = default(IConnectableObservable<int>);
  582. var subscription = default(IDisposable);
  583. var connection = default(IDisposable);
  584. var res = scheduler.CreateObserver<int>();
  585. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(100), scheduler));
  586. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  587. scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
  588. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  589. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  590. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  591. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  592. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  593. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  594. scheduler.Start();
  595. res.Messages.AssertEqual(
  596. OnNext(451, 5),
  597. OnNext(452, 6),
  598. OnNext(453, 7)
  599. );
  600. xs.Subscriptions.AssertEqual(
  601. Subscribe(300, 400),
  602. Subscribe(500, 550),
  603. Subscribe(650, 800)
  604. );
  605. }
  606. [Fact]
  607. public void ReplayTime_MultipleConnections()
  608. {
  609. var xs = Observable.Never<int>();
  610. var ys = xs.Replay(TimeSpan.FromTicks(100), new TestScheduler());
  611. var connection1 = ys.Connect();
  612. var connection2 = ys.Connect();
  613. Assert.Same(connection1, connection2);
  614. connection1.Dispose();
  615. connection2.Dispose();
  616. var connection3 = ys.Connect();
  617. Assert.NotSame(connection1, connection3);
  618. connection3.Dispose();
  619. }
  620. [Fact]
  621. public void ReplayTimeLambda_Zip_Complete()
  622. {
  623. var scheduler = new TestScheduler();
  624. var xs = scheduler.CreateHotObservable(
  625. OnNext(110, 7),
  626. OnNext(220, 3),
  627. OnNext(280, 4),
  628. OnNext(290, 1),
  629. OnNext(340, 8),
  630. OnNext(360, 5),
  631. OnNext(370, 6),
  632. OnNext(390, 7),
  633. OnNext(410, 13),
  634. OnNext(430, 2),
  635. OnNext(450, 9),
  636. OnNext(520, 11),
  637. OnNext(560, 20),
  638. OnCompleted<int>(600)
  639. );
  640. var res = scheduler.Start(() =>
  641. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
  642. 610
  643. );
  644. res.Messages.AssertEqual(
  645. OnNext(221, 3),
  646. OnNext(281, 4),
  647. OnNext(291, 1),
  648. OnNext(341, 8),
  649. OnNext(361, 5),
  650. OnNext(371, 6),
  651. OnNext(372, 8),
  652. OnNext(373, 5),
  653. OnNext(374, 6),
  654. OnNext(391, 7),
  655. OnNext(411, 13),
  656. OnNext(431, 2),
  657. OnNext(432, 7),
  658. OnNext(433, 13),
  659. OnNext(434, 2),
  660. OnNext(451, 9),
  661. OnNext(521, 11),
  662. OnNext(561, 20),
  663. OnNext(562, 11),
  664. OnNext(563, 20),
  665. OnNext(602, 20),
  666. OnNext(604, 20),
  667. OnNext(606, 20),
  668. OnNext(608, 20)
  669. );
  670. xs.Subscriptions.AssertEqual(
  671. Subscribe(200, 600)
  672. );
  673. }
  674. [Fact]
  675. public void ReplayTimeLambda_Zip_Error()
  676. {
  677. var scheduler = new TestScheduler();
  678. var ex = new Exception();
  679. var xs = scheduler.CreateHotObservable(
  680. OnNext(110, 7),
  681. OnNext(220, 3),
  682. OnNext(280, 4),
  683. OnNext(290, 1),
  684. OnNext(340, 8),
  685. OnNext(360, 5),
  686. OnNext(370, 6),
  687. OnNext(390, 7),
  688. OnNext(410, 13),
  689. OnNext(430, 2),
  690. OnNext(450, 9),
  691. OnNext(520, 11),
  692. OnNext(560, 20),
  693. OnError<int>(600, ex)
  694. );
  695. var res = scheduler.Start(() =>
  696. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler)
  697. );
  698. res.Messages.AssertEqual(
  699. OnNext(221, 3),
  700. OnNext(281, 4),
  701. OnNext(291, 1),
  702. OnNext(341, 8),
  703. OnNext(361, 5),
  704. OnNext(371, 6),
  705. OnNext(372, 8),
  706. OnNext(373, 5),
  707. OnNext(374, 6),
  708. OnNext(391, 7),
  709. OnNext(411, 13),
  710. OnNext(431, 2),
  711. OnNext(432, 7),
  712. OnNext(433, 13),
  713. OnNext(434, 2),
  714. OnNext(451, 9),
  715. OnNext(521, 11),
  716. OnNext(561, 20),
  717. OnNext(562, 11),
  718. OnNext(563, 20),
  719. OnError<int>(601, ex)
  720. );
  721. xs.Subscriptions.AssertEqual(
  722. Subscribe(200, 600)
  723. );
  724. }
  725. [Fact]
  726. public void ReplayTimeLambda_Zip_Dispose()
  727. {
  728. var scheduler = new TestScheduler();
  729. var xs = scheduler.CreateHotObservable(
  730. OnNext(110, 7),
  731. OnNext(220, 3),
  732. OnNext(280, 4),
  733. OnNext(290, 1),
  734. OnNext(340, 8),
  735. OnNext(360, 5),
  736. OnNext(370, 6),
  737. OnNext(390, 7),
  738. OnNext(410, 13),
  739. OnNext(430, 2),
  740. OnNext(450, 9),
  741. OnNext(520, 11),
  742. OnNext(560, 20),
  743. OnCompleted<int>(600)
  744. );
  745. var res = scheduler.Start(() =>
  746. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
  747. 470
  748. );
  749. res.Messages.AssertEqual(
  750. OnNext(221, 3),
  751. OnNext(281, 4),
  752. OnNext(291, 1),
  753. OnNext(341, 8),
  754. OnNext(361, 5),
  755. OnNext(371, 6),
  756. OnNext(372, 8),
  757. OnNext(373, 5),
  758. OnNext(374, 6),
  759. OnNext(391, 7),
  760. OnNext(411, 13),
  761. OnNext(431, 2),
  762. OnNext(432, 7),
  763. OnNext(433, 13),
  764. OnNext(434, 2),
  765. OnNext(451, 9)
  766. );
  767. xs.Subscriptions.AssertEqual(
  768. Subscribe(200, 470)
  769. );
  770. }
  771. [Fact]
  772. public void Replay_Default1()
  773. {
  774. var s = new Subject<int>();
  775. var xs = s.Replay(100, DefaultScheduler.Instance);
  776. var ys = s.Replay(100);
  777. xs.Connect();
  778. ys.Connect();
  779. s.OnNext(1);
  780. s.OnNext(2);
  781. s.OnCompleted();
  782. xs.AssertEqual(ys);
  783. }
  784. [Fact]
  785. public void Replay_Default2()
  786. {
  787. var s = new Subject<int>();
  788. var xs = s.Replay(TimeSpan.FromHours(1), DefaultScheduler.Instance);
  789. var ys = s.Replay(TimeSpan.FromHours(1));
  790. xs.Connect();
  791. ys.Connect();
  792. s.OnNext(1);
  793. s.OnNext(2);
  794. s.OnCompleted();
  795. xs.AssertEqual(ys);
  796. }
  797. [Fact]
  798. public void Replay_Default3()
  799. {
  800. var s = new Subject<int>();
  801. var xs = s.Replay(100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  802. var ys = s.Replay(100, TimeSpan.FromHours(1));
  803. xs.Connect();
  804. ys.Connect();
  805. s.OnNext(1);
  806. s.OnNext(2);
  807. s.OnCompleted();
  808. xs.AssertEqual(ys);
  809. }
  810. [Fact]
  811. public void Replay_Default4()
  812. {
  813. var s = new Subject<int>();
  814. var xs = s.Replay(DefaultScheduler.Instance);
  815. var ys = s.Replay();
  816. xs.Connect();
  817. ys.Connect();
  818. s.OnNext(1);
  819. s.OnNext(2);
  820. s.OnCompleted();
  821. xs.AssertEqual(ys);
  822. }
  823. [Fact]
  824. public void ReplayLambda_Default1()
  825. {
  826. var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, DefaultScheduler.Instance);
  827. var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100);
  828. xs.AssertEqual(ys);
  829. }
  830. [Fact]
  831. public void ReplayLambda_Default2()
  832. {
  833. var xs = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  834. var ys = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1));
  835. xs.AssertEqual(ys);
  836. }
  837. [Fact]
  838. public void ReplayLambda_Default3()
  839. {
  840. var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  841. var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1));
  842. xs.AssertEqual(ys);
  843. }
  844. [Fact]
  845. public void ReplayLambda_Default4()
  846. {
  847. var xs = Observable.Range(1, 10).Replay(_xs => _xs, DefaultScheduler.Instance);
  848. var ys = Observable.Range(1, 10).Replay(_xs => _xs);
  849. xs.AssertEqual(ys);
  850. }
  851. }
  852. }