1
0

ObservableBindingTest.cs 91 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Linq;
  9. using System.Reactive.Subjects;
  10. using Microsoft.Reactive.Testing;
  11. using Xunit;
  12. using ReactiveTests.Dummies;
  13. namespace ReactiveTests.Tests
  14. {
  15. public partial class ObservableBindingTest : ReactiveTest
  16. {
  17. #region Multicast
  18. [Fact]
  19. public void Multicast_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int>(null, new Subject<int>()));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int>(DummyObservable<int>.Instance, null));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(null, () => new Subject<int>(), xs => xs));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(DummyObservable<int>.Instance, null, xs => xs));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(DummyObservable<int>.Instance, () => new Subject<int>(), null));
  26. }
  27. [Fact]
  28. public void Multicast_Hot_1()
  29. {
  30. var scheduler = new TestScheduler();
  31. var s = new Subject<int>();
  32. var xs = scheduler.CreateHotObservable(
  33. OnNext(40, 0),
  34. OnNext(90, 1),
  35. OnNext(150, 2),
  36. OnNext(210, 3),
  37. OnNext(240, 4),
  38. OnNext(270, 5),
  39. OnNext(330, 6),
  40. OnNext(340, 7),
  41. OnCompleted<int>(390)
  42. );
  43. var c = default(IConnectableObservable<int>);
  44. var o = scheduler.CreateObserver<int>();
  45. var d1 = default(IDisposable);
  46. var d2 = default(IDisposable);
  47. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  48. scheduler.ScheduleAbsolute(100, () => d1 = c.Subscribe(o));
  49. scheduler.ScheduleAbsolute(200, () => d2 = c.Connect());
  50. scheduler.ScheduleAbsolute(300, () => d1.Dispose());
  51. scheduler.Start();
  52. o.Messages.AssertEqual(
  53. OnNext(210, 3),
  54. OnNext(240, 4),
  55. OnNext(270, 5)
  56. );
  57. xs.Subscriptions.AssertEqual(
  58. Subscribe(200, 390)
  59. );
  60. }
  61. [Fact]
  62. public void Multicast_Hot_2()
  63. {
  64. var scheduler = new TestScheduler();
  65. var s = new Subject<int>();
  66. var xs = scheduler.CreateHotObservable(
  67. OnNext(40, 0),
  68. OnNext(90, 1),
  69. OnNext(150, 2),
  70. OnNext(210, 3),
  71. OnNext(240, 4),
  72. OnNext(270, 5),
  73. OnNext(330, 6),
  74. OnNext(340, 7),
  75. OnCompleted<int>(390)
  76. );
  77. var c = default(IConnectableObservable<int>);
  78. var o = scheduler.CreateObserver<int>();
  79. var d1 = default(IDisposable);
  80. var d2 = default(IDisposable);
  81. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  82. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  83. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  84. scheduler.ScheduleAbsolute(300, () => d1.Dispose());
  85. scheduler.Start();
  86. o.Messages.AssertEqual(
  87. OnNext(210, 3),
  88. OnNext(240, 4),
  89. OnNext(270, 5)
  90. );
  91. xs.Subscriptions.AssertEqual(
  92. Subscribe(100, 390)
  93. );
  94. }
  95. [Fact]
  96. public void Multicast_Hot_3()
  97. {
  98. var scheduler = new TestScheduler();
  99. var s = new Subject<int>();
  100. var xs = scheduler.CreateHotObservable(
  101. OnNext(40, 0),
  102. OnNext(90, 1),
  103. OnNext(150, 2),
  104. OnNext(210, 3),
  105. OnNext(240, 4),
  106. OnNext(270, 5),
  107. OnNext(330, 6),
  108. OnNext(340, 7),
  109. OnCompleted<int>(390)
  110. );
  111. var c = default(IConnectableObservable<int>);
  112. var o = scheduler.CreateObserver<int>();
  113. var d1 = default(IDisposable);
  114. var d2 = default(IDisposable);
  115. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  116. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  117. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  118. scheduler.ScheduleAbsolute(300, () => d2.Dispose());
  119. scheduler.ScheduleAbsolute(335, () => d2 = c.Connect());
  120. scheduler.Start();
  121. o.Messages.AssertEqual(
  122. OnNext(210, 3),
  123. OnNext(240, 4),
  124. OnNext(270, 5),
  125. OnNext(340, 7),
  126. OnCompleted<int>(390)
  127. );
  128. xs.Subscriptions.AssertEqual(
  129. Subscribe(100, 300),
  130. Subscribe(335, 390)
  131. );
  132. }
  133. [Fact]
  134. public void Multicast_Hot_4()
  135. {
  136. var scheduler = new TestScheduler();
  137. var s = new Subject<int>();
  138. var ex = new Exception();
  139. var xs = scheduler.CreateHotObservable(
  140. OnNext(40, 0),
  141. OnNext(90, 1),
  142. OnNext(150, 2),
  143. OnNext(210, 3),
  144. OnNext(240, 4),
  145. OnNext(270, 5),
  146. OnNext(330, 6),
  147. OnNext(340, 7),
  148. OnError<int>(390, ex)
  149. );
  150. var c = default(IConnectableObservable<int>);
  151. var o = scheduler.CreateObserver<int>();
  152. var d1 = default(IDisposable);
  153. var d2 = default(IDisposable);
  154. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  155. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  156. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  157. scheduler.ScheduleAbsolute(300, () => d2.Dispose());
  158. scheduler.ScheduleAbsolute(335, () => d2 = c.Connect());
  159. scheduler.Start();
  160. o.Messages.AssertEqual(
  161. OnNext(210, 3),
  162. OnNext(240, 4),
  163. OnNext(270, 5),
  164. OnNext(340, 7),
  165. OnError<int>(390, ex)
  166. );
  167. xs.Subscriptions.AssertEqual(
  168. Subscribe(100, 300),
  169. Subscribe(335, 390)
  170. );
  171. }
  172. [Fact]
  173. public void Multicast_Hot_5()
  174. {
  175. var scheduler = new TestScheduler();
  176. var s = new Subject<int>();
  177. var ex = new Exception();
  178. var xs = scheduler.CreateHotObservable(
  179. OnNext(40, 0),
  180. OnNext(90, 1),
  181. OnNext(150, 2),
  182. OnNext(210, 3),
  183. OnNext(240, 4),
  184. OnNext(270, 5),
  185. OnNext(330, 6),
  186. OnNext(340, 7),
  187. OnError<int>(390, ex)
  188. );
  189. var c = default(IConnectableObservable<int>);
  190. var o = scheduler.CreateObserver<int>();
  191. var d1 = default(IDisposable);
  192. var d2 = default(IDisposable);
  193. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  194. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  195. scheduler.ScheduleAbsolute(400, () => d1 = c.Subscribe(o));
  196. scheduler.Start();
  197. o.Messages.AssertEqual(
  198. OnError<int>(400, ex)
  199. );
  200. xs.Subscriptions.AssertEqual(
  201. Subscribe(100, 390)
  202. );
  203. }
  204. [Fact]
  205. public void Multicast_Hot_6()
  206. {
  207. var scheduler = new TestScheduler();
  208. var s = new Subject<int>();
  209. var xs = scheduler.CreateHotObservable(
  210. OnNext(40, 0),
  211. OnNext(90, 1),
  212. OnNext(150, 2),
  213. OnNext(210, 3),
  214. OnNext(240, 4),
  215. OnNext(270, 5),
  216. OnNext(330, 6),
  217. OnNext(340, 7),
  218. OnCompleted<int>(390)
  219. );
  220. var c = default(IConnectableObservable<int>);
  221. var o = scheduler.CreateObserver<int>();
  222. var d1 = default(IDisposable);
  223. var d2 = default(IDisposable);
  224. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  225. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  226. scheduler.ScheduleAbsolute(400, () => d1 = c.Subscribe(o));
  227. scheduler.Start();
  228. o.Messages.AssertEqual(
  229. OnCompleted<int>(400)
  230. );
  231. xs.Subscriptions.AssertEqual(
  232. Subscribe(100, 390)
  233. );
  234. }
  235. [Fact]
  236. public void Multicast_Cold_Completed()
  237. {
  238. var scheduler = new TestScheduler();
  239. var xs = scheduler.CreateHotObservable(
  240. OnNext(40, 0),
  241. OnNext(90, 1),
  242. OnNext(150, 2),
  243. OnNext(210, 3),
  244. OnNext(240, 4),
  245. OnNext(270, 5),
  246. OnNext(330, 6),
  247. OnNext(340, 7),
  248. OnCompleted<int>(390)
  249. );
  250. var res = scheduler.Start(() =>
  251. xs.Multicast(() => new Subject<int>(), ys => ys)
  252. );
  253. res.Messages.AssertEqual(
  254. OnNext(210, 3),
  255. OnNext(240, 4),
  256. OnNext(270, 5),
  257. OnNext(330, 6),
  258. OnNext(340, 7),
  259. OnCompleted<int>(390)
  260. );
  261. xs.Subscriptions.AssertEqual(
  262. Subscribe(200, 390)
  263. );
  264. }
  265. [Fact]
  266. public void Multicast_Cold_Error()
  267. {
  268. var scheduler = new TestScheduler();
  269. var ex = new Exception();
  270. var xs = scheduler.CreateHotObservable(
  271. OnNext(40, 0),
  272. OnNext(90, 1),
  273. OnNext(150, 2),
  274. OnNext(210, 3),
  275. OnNext(240, 4),
  276. OnNext(270, 5),
  277. OnNext(330, 6),
  278. OnNext(340, 7),
  279. OnError<int>(390, ex)
  280. );
  281. var res = scheduler.Start(() =>
  282. xs.Multicast(() => new Subject<int>(), ys => ys)
  283. );
  284. res.Messages.AssertEqual(
  285. OnNext(210, 3),
  286. OnNext(240, 4),
  287. OnNext(270, 5),
  288. OnNext(330, 6),
  289. OnNext(340, 7),
  290. OnError<int>(390, ex)
  291. );
  292. xs.Subscriptions.AssertEqual(
  293. Subscribe(200, 390)
  294. );
  295. }
  296. [Fact]
  297. public void Multicast_Cold_Dispose()
  298. {
  299. var scheduler = new TestScheduler();
  300. var xs = scheduler.CreateHotObservable(
  301. OnNext(40, 0),
  302. OnNext(90, 1),
  303. OnNext(150, 2),
  304. OnNext(210, 3),
  305. OnNext(240, 4),
  306. OnNext(270, 5),
  307. OnNext(330, 6),
  308. OnNext(340, 7)
  309. );
  310. var res = scheduler.Start(() =>
  311. xs.Multicast(() => new Subject<int>(), ys => ys)
  312. );
  313. res.Messages.AssertEqual(
  314. OnNext(210, 3),
  315. OnNext(240, 4),
  316. OnNext(270, 5),
  317. OnNext(330, 6),
  318. OnNext(340, 7)
  319. );
  320. xs.Subscriptions.AssertEqual(
  321. Subscribe(200, 1000)
  322. );
  323. }
  324. [Fact]
  325. public void Multicast_Cold_Zip()
  326. {
  327. var scheduler = new TestScheduler();
  328. var xs = scheduler.CreateHotObservable(
  329. OnNext(40, 0),
  330. OnNext(90, 1),
  331. OnNext(150, 2),
  332. OnNext(210, 3),
  333. OnNext(240, 4),
  334. OnNext(270, 5),
  335. OnNext(330, 6),
  336. OnNext(340, 7),
  337. OnCompleted<int>(390)
  338. );
  339. var res = scheduler.Start(() =>
  340. xs.Multicast(() => new Subject<int>(), ys => ys.Zip(ys, (a, b) => a + b))
  341. );
  342. res.Messages.AssertEqual(
  343. OnNext(210, 6),
  344. OnNext(240, 8),
  345. OnNext(270, 10),
  346. OnNext(330, 12),
  347. OnNext(340, 14),
  348. OnCompleted<int>(390)
  349. );
  350. xs.Subscriptions.AssertEqual(
  351. Subscribe(200, 390)
  352. );
  353. }
  354. [Fact]
  355. public void Multicast_SubjectSelectorThrows()
  356. {
  357. var ex = new Exception();
  358. var scheduler = new TestScheduler();
  359. var xs = scheduler.CreateHotObservable(
  360. OnNext(210, 1),
  361. OnNext(240, 2),
  362. OnCompleted<int>(300)
  363. );
  364. var res = scheduler.Start(() =>
  365. xs.Multicast<int, int, int>(() => { throw ex; }, _ => _)
  366. );
  367. res.Messages.AssertEqual(
  368. OnError<int>(200, ex)
  369. );
  370. xs.Subscriptions.AssertEqual(
  371. );
  372. }
  373. [Fact]
  374. public void Multicast_SelectorThrows()
  375. {
  376. var ex = new Exception();
  377. var scheduler = new TestScheduler();
  378. var xs = scheduler.CreateHotObservable(
  379. OnNext(210, 1),
  380. OnNext(240, 2),
  381. OnCompleted<int>(300)
  382. );
  383. var res = scheduler.Start(() =>
  384. xs.Multicast<int, int, int>(() => new Subject<int>(), _ => { throw ex; })
  385. );
  386. res.Messages.AssertEqual(
  387. OnError<int>(200, ex)
  388. );
  389. xs.Subscriptions.AssertEqual(
  390. );
  391. }
  392. #endregion
  393. #region Publish
  394. [Fact]
  395. public void Publish_Cold_Zip()
  396. {
  397. var scheduler = new TestScheduler();
  398. var xs = scheduler.CreateHotObservable(
  399. OnNext(40, 0),
  400. OnNext(90, 1),
  401. OnNext(150, 2),
  402. OnNext(210, 3),
  403. OnNext(240, 4),
  404. OnNext(270, 5),
  405. OnNext(330, 6),
  406. OnNext(340, 7),
  407. OnCompleted<int>(390)
  408. );
  409. var res = scheduler.Start(() =>
  410. xs.Publish(ys => ys.Zip(ys, (a, b) => a + b))
  411. );
  412. res.Messages.AssertEqual(
  413. OnNext(210, 6),
  414. OnNext(240, 8),
  415. OnNext(270, 10),
  416. OnNext(330, 12),
  417. OnNext(340, 14),
  418. OnCompleted<int>(390)
  419. );
  420. xs.Subscriptions.AssertEqual(
  421. Subscribe(200, 390)
  422. );
  423. }
  424. [Fact]
  425. public void Publish_ArgumentChecking()
  426. {
  427. var someObservable = Observable.Empty<int>();
  428. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>)));
  429. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>), x => x));
  430. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish<int, int>(someObservable, null));
  431. }
  432. [Fact]
  433. public void Publish_Basic()
  434. {
  435. var scheduler = new TestScheduler();
  436. var xs = scheduler.CreateHotObservable(
  437. OnNext(110, 7),
  438. OnNext(220, 3),
  439. OnNext(280, 4),
  440. OnNext(290, 1),
  441. OnNext(340, 8),
  442. OnNext(360, 5),
  443. OnNext(370, 6),
  444. OnNext(390, 7),
  445. OnNext(410, 13),
  446. OnNext(430, 2),
  447. OnNext(450, 9),
  448. OnNext(520, 11),
  449. OnNext(560, 20),
  450. OnCompleted<int>(600)
  451. );
  452. var ys = default(IConnectableObservable<int>);
  453. var subscription = default(IDisposable);
  454. var connection = default(IDisposable);
  455. var res = scheduler.CreateObserver<int>();
  456. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  457. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  458. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  459. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  460. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  461. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  462. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  463. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  464. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  465. scheduler.Start();
  466. res.Messages.AssertEqual(
  467. OnNext(340, 8),
  468. OnNext(360, 5),
  469. OnNext(370, 6),
  470. OnNext(390, 7),
  471. OnNext(520, 11)
  472. );
  473. xs.Subscriptions.AssertEqual(
  474. Subscribe(300, 400),
  475. Subscribe(500, 550),
  476. Subscribe(650, 800)
  477. );
  478. }
  479. [Fact]
  480. public void Publish_Error()
  481. {
  482. var scheduler = new TestScheduler();
  483. var ex = new Exception();
  484. var xs = scheduler.CreateHotObservable(
  485. OnNext(110, 7),
  486. OnNext(220, 3),
  487. OnNext(280, 4),
  488. OnNext(290, 1),
  489. OnNext(340, 8),
  490. OnNext(360, 5),
  491. OnNext(370, 6),
  492. OnNext(390, 7),
  493. OnNext(410, 13),
  494. OnNext(430, 2),
  495. OnNext(450, 9),
  496. OnNext(520, 11),
  497. OnNext(560, 20),
  498. OnError<int>(600, ex)
  499. );
  500. var ys = default(IConnectableObservable<int>);
  501. var subscription = default(IDisposable);
  502. var connection = default(IDisposable);
  503. var res = scheduler.CreateObserver<int>();
  504. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  505. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  506. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  507. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  508. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  509. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  510. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  511. scheduler.Start();
  512. res.Messages.AssertEqual(
  513. OnNext(340, 8),
  514. OnNext(360, 5),
  515. OnNext(370, 6),
  516. OnNext(390, 7),
  517. OnNext(520, 11),
  518. OnNext(560, 20),
  519. OnError<int>(600, ex)
  520. );
  521. xs.Subscriptions.AssertEqual(
  522. Subscribe(300, 400),
  523. Subscribe(500, 600)
  524. );
  525. }
  526. [Fact]
  527. public void Publish_Complete()
  528. {
  529. var scheduler = new TestScheduler();
  530. var xs = scheduler.CreateHotObservable(
  531. OnNext(110, 7),
  532. OnNext(220, 3),
  533. OnNext(280, 4),
  534. OnNext(290, 1),
  535. OnNext(340, 8),
  536. OnNext(360, 5),
  537. OnNext(370, 6),
  538. OnNext(390, 7),
  539. OnNext(410, 13),
  540. OnNext(430, 2),
  541. OnNext(450, 9),
  542. OnNext(520, 11),
  543. OnNext(560, 20),
  544. OnCompleted<int>(600)
  545. );
  546. var ys = default(IConnectableObservable<int>);
  547. var subscription = default(IDisposable);
  548. var connection = default(IDisposable);
  549. var res = scheduler.CreateObserver<int>();
  550. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  551. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  552. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  553. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  554. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  555. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  556. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  557. scheduler.Start();
  558. res.Messages.AssertEqual(
  559. OnNext(340, 8),
  560. OnNext(360, 5),
  561. OnNext(370, 6),
  562. OnNext(390, 7),
  563. OnNext(520, 11),
  564. OnNext(560, 20),
  565. OnCompleted<int>(600)
  566. );
  567. xs.Subscriptions.AssertEqual(
  568. Subscribe(300, 400),
  569. Subscribe(500, 600)
  570. );
  571. }
  572. [Fact]
  573. public void Publish_Dispose()
  574. {
  575. var scheduler = new TestScheduler();
  576. var xs = scheduler.CreateHotObservable(
  577. OnNext(110, 7),
  578. OnNext(220, 3),
  579. OnNext(280, 4),
  580. OnNext(290, 1),
  581. OnNext(340, 8),
  582. OnNext(360, 5),
  583. OnNext(370, 6),
  584. OnNext(390, 7),
  585. OnNext(410, 13),
  586. OnNext(430, 2),
  587. OnNext(450, 9),
  588. OnNext(520, 11),
  589. OnNext(560, 20),
  590. OnCompleted<int>(600)
  591. );
  592. var ys = default(IConnectableObservable<int>);
  593. var subscription = default(IDisposable);
  594. var connection = default(IDisposable);
  595. var res = scheduler.CreateObserver<int>();
  596. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  597. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  598. scheduler.ScheduleAbsolute(350, () => subscription.Dispose());
  599. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  600. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  601. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  602. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  603. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  604. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  605. scheduler.Start();
  606. res.Messages.AssertEqual(
  607. OnNext(340, 8)
  608. );
  609. xs.Subscriptions.AssertEqual(
  610. Subscribe(300, 400),
  611. Subscribe(500, 550),
  612. Subscribe(650, 800)
  613. );
  614. }
  615. [Fact]
  616. public void Publish_MultipleConnections()
  617. {
  618. var xs = Observable.Never<int>();
  619. var ys = xs.Publish();
  620. var connection1 = ys.Connect();
  621. var connection2 = ys.Connect();
  622. Assert.Same(connection1, connection2);
  623. connection1.Dispose();
  624. connection2.Dispose();
  625. var connection3 = ys.Connect();
  626. Assert.NotSame(connection1, connection3);
  627. connection3.Dispose();
  628. }
  629. [Fact]
  630. public void PublishLambda_Zip_Complete()
  631. {
  632. var scheduler = new TestScheduler();
  633. var xs = scheduler.CreateHotObservable(
  634. OnNext(110, 7),
  635. OnNext(220, 3),
  636. OnNext(280, 4),
  637. OnNext(290, 1),
  638. OnNext(340, 8),
  639. OnNext(360, 5),
  640. OnNext(370, 6),
  641. OnNext(390, 7),
  642. OnNext(410, 13),
  643. OnNext(430, 2),
  644. OnNext(450, 9),
  645. OnNext(520, 11),
  646. OnNext(560, 20),
  647. OnCompleted<int>(600)
  648. );
  649. var res = scheduler.Start(() =>
  650. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev))
  651. );
  652. res.Messages.AssertEqual(
  653. OnNext(280, 7),
  654. OnNext(290, 5),
  655. OnNext(340, 9),
  656. OnNext(360, 13),
  657. OnNext(370, 11),
  658. OnNext(390, 13),
  659. OnNext(410, 20),
  660. OnNext(430, 15),
  661. OnNext(450, 11),
  662. OnNext(520, 20),
  663. OnNext(560, 31),
  664. OnCompleted<int>(600)
  665. );
  666. xs.Subscriptions.AssertEqual(
  667. Subscribe(200, 600)
  668. );
  669. }
  670. [Fact]
  671. public void PublishLambda_Zip_Error()
  672. {
  673. var scheduler = new TestScheduler();
  674. var ex = new Exception();
  675. var xs = scheduler.CreateHotObservable(
  676. OnNext(110, 7),
  677. OnNext(220, 3),
  678. OnNext(280, 4),
  679. OnNext(290, 1),
  680. OnNext(340, 8),
  681. OnNext(360, 5),
  682. OnNext(370, 6),
  683. OnNext(390, 7),
  684. OnNext(410, 13),
  685. OnNext(430, 2),
  686. OnNext(450, 9),
  687. OnNext(520, 11),
  688. OnNext(560, 20),
  689. OnError<int>(600, ex)
  690. );
  691. var res = scheduler.Start(() =>
  692. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev))
  693. );
  694. res.Messages.AssertEqual(
  695. OnNext(280, 7),
  696. OnNext(290, 5),
  697. OnNext(340, 9),
  698. OnNext(360, 13),
  699. OnNext(370, 11),
  700. OnNext(390, 13),
  701. OnNext(410, 20),
  702. OnNext(430, 15),
  703. OnNext(450, 11),
  704. OnNext(520, 20),
  705. OnNext(560, 31),
  706. OnError<int>(600, ex)
  707. );
  708. xs.Subscriptions.AssertEqual(
  709. Subscribe(200, 600)
  710. );
  711. }
  712. [Fact]
  713. public void PublishLambda_Zip_Dispose()
  714. {
  715. var scheduler = new TestScheduler();
  716. var xs = scheduler.CreateHotObservable(
  717. OnNext(110, 7),
  718. OnNext(220, 3),
  719. OnNext(280, 4),
  720. OnNext(290, 1),
  721. OnNext(340, 8),
  722. OnNext(360, 5),
  723. OnNext(370, 6),
  724. OnNext(390, 7),
  725. OnNext(410, 13),
  726. OnNext(430, 2),
  727. OnNext(450, 9),
  728. OnNext(520, 11),
  729. OnNext(560, 20),
  730. OnCompleted<int>(600)
  731. );
  732. var res = scheduler.Start(() =>
  733. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev)),
  734. 470
  735. );
  736. res.Messages.AssertEqual(
  737. OnNext(280, 7),
  738. OnNext(290, 5),
  739. OnNext(340, 9),
  740. OnNext(360, 13),
  741. OnNext(370, 11),
  742. OnNext(390, 13),
  743. OnNext(410, 20),
  744. OnNext(430, 15),
  745. OnNext(450, 11)
  746. );
  747. xs.Subscriptions.AssertEqual(
  748. Subscribe(200, 470)
  749. );
  750. }
  751. [Fact]
  752. public void PublishWithInitialValue_ArgumentChecking()
  753. {
  754. var someObservable = Observable.Empty<int>();
  755. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>), 1));
  756. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>), x => x, 1));
  757. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(someObservable, default(Func<IObservable<int>, IObservable<int>>), 1));
  758. }
  759. [Fact]
  760. public void PublishWithInitialValue_SanityCheck()
  761. {
  762. var someObservable = Observable.Empty<int>();
  763. Observable.Publish(Observable.Range(1, 10), x => x, 0).AssertEqual(Observable.Range(0, 11));
  764. }
  765. [Fact]
  766. public void PublishWithInitialValue_Basic()
  767. {
  768. var scheduler = new TestScheduler();
  769. var xs = scheduler.CreateHotObservable(
  770. OnNext(110, 7),
  771. OnNext(220, 3),
  772. OnNext(280, 4),
  773. OnNext(290, 1),
  774. OnNext(340, 8),
  775. OnNext(360, 5),
  776. OnNext(370, 6),
  777. OnNext(390, 7),
  778. OnNext(410, 13),
  779. OnNext(430, 2),
  780. OnNext(450, 9),
  781. OnNext(520, 11),
  782. OnNext(560, 20),
  783. OnCompleted<int>(600)
  784. );
  785. var ys = default(IConnectableObservable<int>);
  786. var subscription = default(IDisposable);
  787. var connection = default(IDisposable);
  788. var res = scheduler.CreateObserver<int>();
  789. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  790. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  791. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  792. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  793. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  794. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  795. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  796. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  797. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  798. scheduler.Start();
  799. res.Messages.AssertEqual(
  800. OnNext(200, 1979),
  801. OnNext(340, 8),
  802. OnNext(360, 5),
  803. OnNext(370, 6),
  804. OnNext(390, 7),
  805. OnNext(520, 11)
  806. );
  807. xs.Subscriptions.AssertEqual(
  808. Subscribe(300, 400),
  809. Subscribe(500, 550),
  810. Subscribe(650, 800)
  811. );
  812. }
  813. [Fact]
  814. public void PublishWithInitialValue_Error()
  815. {
  816. var scheduler = new TestScheduler();
  817. var ex = new Exception();
  818. var xs = scheduler.CreateHotObservable(
  819. OnNext(110, 7),
  820. OnNext(220, 3),
  821. OnNext(280, 4),
  822. OnNext(290, 1),
  823. OnNext(340, 8),
  824. OnNext(360, 5),
  825. OnNext(370, 6),
  826. OnNext(390, 7),
  827. OnNext(410, 13),
  828. OnNext(430, 2),
  829. OnNext(450, 9),
  830. OnNext(520, 11),
  831. OnNext(560, 20),
  832. OnError<int>(600, ex)
  833. );
  834. var ys = default(IConnectableObservable<int>);
  835. var subscription = default(IDisposable);
  836. var connection = default(IDisposable);
  837. var res = scheduler.CreateObserver<int>();
  838. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  839. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  840. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  841. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  842. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  843. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  844. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  845. scheduler.Start();
  846. res.Messages.AssertEqual(
  847. OnNext(200, 1979),
  848. OnNext(340, 8),
  849. OnNext(360, 5),
  850. OnNext(370, 6),
  851. OnNext(390, 7),
  852. OnNext(520, 11),
  853. OnNext(560, 20),
  854. OnError<int>(600, ex)
  855. );
  856. xs.Subscriptions.AssertEqual(
  857. Subscribe(300, 400),
  858. Subscribe(500, 600)
  859. );
  860. }
  861. [Fact]
  862. public void PublishWithInitialValue_Complete()
  863. {
  864. var scheduler = new TestScheduler();
  865. var xs = scheduler.CreateHotObservable(
  866. OnNext(110, 7),
  867. OnNext(220, 3),
  868. OnNext(280, 4),
  869. OnNext(290, 1),
  870. OnNext(340, 8),
  871. OnNext(360, 5),
  872. OnNext(370, 6),
  873. OnNext(390, 7),
  874. OnNext(410, 13),
  875. OnNext(430, 2),
  876. OnNext(450, 9),
  877. OnNext(520, 11),
  878. OnNext(560, 20),
  879. OnCompleted<int>(600)
  880. );
  881. var ys = default(IConnectableObservable<int>);
  882. var subscription = default(IDisposable);
  883. var connection = default(IDisposable);
  884. var res = scheduler.CreateObserver<int>();
  885. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  886. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  887. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  888. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  889. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  890. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  891. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  892. scheduler.Start();
  893. res.Messages.AssertEqual(
  894. OnNext(200, 1979),
  895. OnNext(340, 8),
  896. OnNext(360, 5),
  897. OnNext(370, 6),
  898. OnNext(390, 7),
  899. OnNext(520, 11),
  900. OnNext(560, 20),
  901. OnCompleted<int>(600)
  902. );
  903. xs.Subscriptions.AssertEqual(
  904. Subscribe(300, 400),
  905. Subscribe(500, 600)
  906. );
  907. }
  908. [Fact]
  909. public void PublishWithInitialValue_Dispose()
  910. {
  911. var scheduler = new TestScheduler();
  912. var xs = scheduler.CreateHotObservable(
  913. OnNext(110, 7),
  914. OnNext(220, 3),
  915. OnNext(280, 4),
  916. OnNext(290, 1),
  917. OnNext(340, 8),
  918. OnNext(360, 5),
  919. OnNext(370, 6),
  920. OnNext(390, 7),
  921. OnNext(410, 13),
  922. OnNext(430, 2),
  923. OnNext(450, 9),
  924. OnNext(520, 11),
  925. OnNext(560, 20),
  926. OnCompleted<int>(600)
  927. );
  928. var ys = default(IConnectableObservable<int>);
  929. var subscription = default(IDisposable);
  930. var connection = default(IDisposable);
  931. var res = scheduler.CreateObserver<int>();
  932. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  933. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  934. scheduler.ScheduleAbsolute(350, () => subscription.Dispose());
  935. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  936. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  937. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  938. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  939. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  940. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  941. scheduler.Start();
  942. res.Messages.AssertEqual(
  943. OnNext(200, 1979),
  944. OnNext(340, 8)
  945. );
  946. xs.Subscriptions.AssertEqual(
  947. Subscribe(300, 400),
  948. Subscribe(500, 550),
  949. Subscribe(650, 800)
  950. );
  951. }
  952. [Fact]
  953. public void PublishWithInitialValue_MultipleConnections()
  954. {
  955. var xs = Observable.Never<int>();
  956. var ys = xs.Publish(1979);
  957. var connection1 = ys.Connect();
  958. var connection2 = ys.Connect();
  959. Assert.Same(connection1, connection2);
  960. connection1.Dispose();
  961. connection2.Dispose();
  962. var connection3 = ys.Connect();
  963. Assert.NotSame(connection1, connection3);
  964. connection3.Dispose();
  965. }
  966. [Fact]
  967. public void PublishWithInitialValueLambda_Zip_Complete()
  968. {
  969. var scheduler = new TestScheduler();
  970. var xs = scheduler.CreateHotObservable(
  971. OnNext(110, 7),
  972. OnNext(220, 3),
  973. OnNext(280, 4),
  974. OnNext(290, 1),
  975. OnNext(340, 8),
  976. OnNext(360, 5),
  977. OnNext(370, 6),
  978. OnNext(390, 7),
  979. OnNext(410, 13),
  980. OnNext(430, 2),
  981. OnNext(450, 9),
  982. OnNext(520, 11),
  983. OnNext(560, 20),
  984. OnCompleted<int>(600)
  985. );
  986. var res = scheduler.Start(() =>
  987. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev), 1979)
  988. );
  989. res.Messages.AssertEqual(
  990. OnNext(220, 1982),
  991. OnNext(280, 7),
  992. OnNext(290, 5),
  993. OnNext(340, 9),
  994. OnNext(360, 13),
  995. OnNext(370, 11),
  996. OnNext(390, 13),
  997. OnNext(410, 20),
  998. OnNext(430, 15),
  999. OnNext(450, 11),
  1000. OnNext(520, 20),
  1001. OnNext(560, 31),
  1002. OnCompleted<int>(600)
  1003. );
  1004. xs.Subscriptions.AssertEqual(
  1005. Subscribe(200, 600)
  1006. );
  1007. }
  1008. [Fact]
  1009. public void PublishWithInitialValueLambda_Zip_Error()
  1010. {
  1011. var scheduler = new TestScheduler();
  1012. var ex = new Exception();
  1013. var xs = scheduler.CreateHotObservable(
  1014. OnNext(110, 7),
  1015. OnNext(220, 3),
  1016. OnNext(280, 4),
  1017. OnNext(290, 1),
  1018. OnNext(340, 8),
  1019. OnNext(360, 5),
  1020. OnNext(370, 6),
  1021. OnNext(390, 7),
  1022. OnNext(410, 13),
  1023. OnNext(430, 2),
  1024. OnNext(450, 9),
  1025. OnNext(520, 11),
  1026. OnNext(560, 20),
  1027. OnError<int>(600, ex)
  1028. );
  1029. var res = scheduler.Start(() =>
  1030. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev), 1979)
  1031. );
  1032. res.Messages.AssertEqual(
  1033. OnNext(220, 1982),
  1034. OnNext(280, 7),
  1035. OnNext(290, 5),
  1036. OnNext(340, 9),
  1037. OnNext(360, 13),
  1038. OnNext(370, 11),
  1039. OnNext(390, 13),
  1040. OnNext(410, 20),
  1041. OnNext(430, 15),
  1042. OnNext(450, 11),
  1043. OnNext(520, 20),
  1044. OnNext(560, 31),
  1045. OnError<int>(600, ex)
  1046. );
  1047. xs.Subscriptions.AssertEqual(
  1048. Subscribe(200, 600)
  1049. );
  1050. }
  1051. [Fact]
  1052. public void PublishWithInitialValueLambda_Zip_Dispose()
  1053. {
  1054. var scheduler = new TestScheduler();
  1055. var xs = scheduler.CreateHotObservable(
  1056. OnNext(110, 7),
  1057. OnNext(220, 3),
  1058. OnNext(280, 4),
  1059. OnNext(290, 1),
  1060. OnNext(340, 8),
  1061. OnNext(360, 5),
  1062. OnNext(370, 6),
  1063. OnNext(390, 7),
  1064. OnNext(410, 13),
  1065. OnNext(430, 2),
  1066. OnNext(450, 9),
  1067. OnNext(520, 11),
  1068. OnNext(560, 20),
  1069. OnCompleted<int>(600)
  1070. );
  1071. var res = scheduler.Start(() =>
  1072. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev), 1979),
  1073. 470
  1074. );
  1075. res.Messages.AssertEqual(
  1076. OnNext(220, 1982),
  1077. OnNext(280, 7),
  1078. OnNext(290, 5),
  1079. OnNext(340, 9),
  1080. OnNext(360, 13),
  1081. OnNext(370, 11),
  1082. OnNext(390, 13),
  1083. OnNext(410, 20),
  1084. OnNext(430, 15),
  1085. OnNext(450, 11)
  1086. );
  1087. xs.Subscriptions.AssertEqual(
  1088. Subscribe(200, 470)
  1089. );
  1090. }
  1091. #endregion
  1092. #region PublishLast
  1093. [Fact]
  1094. public void PublishLast_ArgumentChecking()
  1095. {
  1096. var someObservable = Observable.Empty<int>();
  1097. var scheduler = new TestScheduler();
  1098. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.PublishLast(default(IObservable<int>)));
  1099. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.PublishLast(default(IObservable<int>), x => x));
  1100. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.PublishLast<int, int>(someObservable, null));
  1101. }
  1102. [Fact]
  1103. public void PublishLast_Basic()
  1104. {
  1105. var scheduler = new TestScheduler();
  1106. var xs = scheduler.CreateHotObservable(
  1107. OnNext(110, 7),
  1108. OnNext(220, 3),
  1109. OnNext(280, 4),
  1110. OnNext(290, 1),
  1111. OnNext(340, 8),
  1112. OnNext(360, 5),
  1113. OnNext(370, 6),
  1114. OnNext(390, 7),
  1115. OnNext(410, 13),
  1116. OnNext(430, 2),
  1117. OnNext(450, 9),
  1118. OnNext(520, 11),
  1119. OnNext(560, 20),
  1120. OnCompleted<int>(600)
  1121. );
  1122. var ys = default(IConnectableObservable<int>);
  1123. var subscription = default(IDisposable);
  1124. var connection = default(IDisposable);
  1125. var res = scheduler.CreateObserver<int>();
  1126. scheduler.ScheduleAbsolute(Created, () => ys = xs.PublishLast());
  1127. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  1128. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1129. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1130. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1131. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1132. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1133. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1134. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1135. scheduler.Start();
  1136. res.Messages.AssertEqual(
  1137. );
  1138. xs.Subscriptions.AssertEqual(
  1139. Subscribe(300, 400),
  1140. Subscribe(500, 550),
  1141. Subscribe(650, 800)
  1142. );
  1143. }
  1144. [Fact]
  1145. public void PublishLast_Error()
  1146. {
  1147. var scheduler = new TestScheduler();
  1148. var ex = new Exception();
  1149. var xs = scheduler.CreateHotObservable(
  1150. OnNext(110, 7),
  1151. OnNext(220, 3),
  1152. OnNext(280, 4),
  1153. OnNext(290, 1),
  1154. OnNext(340, 8),
  1155. OnNext(360, 5),
  1156. OnNext(370, 6),
  1157. OnNext(390, 7),
  1158. OnNext(410, 13),
  1159. OnNext(430, 2),
  1160. OnNext(450, 9),
  1161. OnNext(520, 11),
  1162. OnNext(560, 20),
  1163. OnError<int>(600, ex)
  1164. );
  1165. var ys = default(IConnectableObservable<int>);
  1166. var subscription = default(IDisposable);
  1167. var connection = default(IDisposable);
  1168. var res = scheduler.CreateObserver<int>();
  1169. scheduler.ScheduleAbsolute(Created, () => ys = xs.PublishLast());
  1170. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  1171. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1172. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1173. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1174. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1175. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1176. scheduler.Start();
  1177. res.Messages.AssertEqual(
  1178. OnError<int>(600, ex)
  1179. );
  1180. xs.Subscriptions.AssertEqual(
  1181. Subscribe(300, 400),
  1182. Subscribe(500, 600)
  1183. );
  1184. }
  1185. [Fact]
  1186. public void PublishLast_Complete()
  1187. {
  1188. var scheduler = new TestScheduler();
  1189. var xs = scheduler.CreateHotObservable(
  1190. OnNext(110, 7),
  1191. OnNext(220, 3),
  1192. OnNext(280, 4),
  1193. OnNext(290, 1),
  1194. OnNext(340, 8),
  1195. OnNext(360, 5),
  1196. OnNext(370, 6),
  1197. OnNext(390, 7),
  1198. OnNext(410, 13),
  1199. OnNext(430, 2),
  1200. OnNext(450, 9),
  1201. OnNext(520, 11),
  1202. OnNext(560, 20),
  1203. OnCompleted<int>(600)
  1204. );
  1205. var ys = default(IConnectableObservable<int>);
  1206. var subscription = default(IDisposable);
  1207. var connection = default(IDisposable);
  1208. var res = scheduler.CreateObserver<int>();
  1209. scheduler.ScheduleAbsolute(Created, () => ys = xs.PublishLast());
  1210. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  1211. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1212. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1213. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1214. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1215. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1216. scheduler.Start();
  1217. res.Messages.AssertEqual(
  1218. OnNext(600, 20),
  1219. OnCompleted<int>(600)
  1220. );
  1221. xs.Subscriptions.AssertEqual(
  1222. Subscribe(300, 400),
  1223. Subscribe(500, 600)
  1224. );
  1225. }
  1226. [Fact]
  1227. public void PublishLast_Dispose()
  1228. {
  1229. var scheduler = new TestScheduler();
  1230. var xs = scheduler.CreateHotObservable(
  1231. OnNext(110, 7),
  1232. OnNext(220, 3),
  1233. OnNext(280, 4),
  1234. OnNext(290, 1),
  1235. OnNext(340, 8),
  1236. OnNext(360, 5),
  1237. OnNext(370, 6),
  1238. OnNext(390, 7),
  1239. OnNext(410, 13),
  1240. OnNext(430, 2),
  1241. OnNext(450, 9),
  1242. OnNext(520, 11),
  1243. OnNext(560, 20),
  1244. OnCompleted<int>(600)
  1245. );
  1246. var ys = default(IConnectableObservable<int>);
  1247. var subscription = default(IDisposable);
  1248. var connection = default(IDisposable);
  1249. var res = scheduler.CreateObserver<int>();
  1250. scheduler.ScheduleAbsolute(Created, () => ys = xs.PublishLast());
  1251. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  1252. scheduler.ScheduleAbsolute(350, () => subscription.Dispose());
  1253. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1254. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1255. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1256. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1257. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1258. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1259. scheduler.Start();
  1260. res.Messages.AssertEqual(
  1261. );
  1262. xs.Subscriptions.AssertEqual(
  1263. Subscribe(300, 400),
  1264. Subscribe(500, 550),
  1265. Subscribe(650, 800)
  1266. );
  1267. }
  1268. [Fact]
  1269. public void PublishLast_MultipleConnections()
  1270. {
  1271. var xs = Observable.Never<int>();
  1272. var ys = xs.PublishLast();
  1273. var connection1 = ys.Connect();
  1274. var connection2 = ys.Connect();
  1275. Assert.Same(connection1, connection2);
  1276. connection1.Dispose();
  1277. connection2.Dispose();
  1278. var connection3 = ys.Connect();
  1279. Assert.NotSame(connection1, connection3);
  1280. connection3.Dispose();
  1281. }
  1282. [Fact]
  1283. public void PublishLastLambda_Zip_Complete()
  1284. {
  1285. var scheduler = new TestScheduler();
  1286. var xs = scheduler.CreateHotObservable(
  1287. OnNext(110, 7),
  1288. OnNext(220, 3),
  1289. OnNext(280, 4),
  1290. OnNext(290, 1),
  1291. OnNext(340, 8),
  1292. OnNext(360, 5),
  1293. OnNext(370, 6),
  1294. OnNext(390, 7),
  1295. OnNext(410, 13),
  1296. OnNext(430, 2),
  1297. OnNext(450, 9),
  1298. OnNext(520, 11),
  1299. OnNext(560, 20),
  1300. OnCompleted<int>(600)
  1301. );
  1302. var res = scheduler.Start(() =>
  1303. xs.PublishLast(_xs => _xs.Zip(_xs, (x, y) => x + y))
  1304. );
  1305. res.Messages.AssertEqual(
  1306. OnNext(600, 40),
  1307. OnCompleted<int>(600)
  1308. );
  1309. xs.Subscriptions.AssertEqual(
  1310. Subscribe(200, 600)
  1311. );
  1312. }
  1313. [Fact]
  1314. public void PublishLastLambda_Zip_Error()
  1315. {
  1316. var scheduler = new TestScheduler();
  1317. var ex = new Exception();
  1318. var xs = scheduler.CreateHotObservable(
  1319. OnNext(110, 7),
  1320. OnNext(220, 3),
  1321. OnNext(280, 4),
  1322. OnNext(290, 1),
  1323. OnNext(340, 8),
  1324. OnNext(360, 5),
  1325. OnNext(370, 6),
  1326. OnNext(390, 7),
  1327. OnNext(410, 13),
  1328. OnNext(430, 2),
  1329. OnNext(450, 9),
  1330. OnNext(520, 11),
  1331. OnNext(560, 20),
  1332. OnError<int>(600, ex)
  1333. );
  1334. var res = scheduler.Start(() =>
  1335. xs.PublishLast(_xs => _xs.Zip(_xs, (x, y) => x + y))
  1336. );
  1337. res.Messages.AssertEqual(
  1338. OnError<int>(600, ex)
  1339. );
  1340. xs.Subscriptions.AssertEqual(
  1341. Subscribe(200, 600)
  1342. );
  1343. }
  1344. [Fact]
  1345. public void PublishLastLambda_Zip_Dispose()
  1346. {
  1347. var scheduler = new TestScheduler();
  1348. var xs = scheduler.CreateHotObservable(
  1349. OnNext(110, 7),
  1350. OnNext(220, 3),
  1351. OnNext(280, 4),
  1352. OnNext(290, 1),
  1353. OnNext(340, 8),
  1354. OnNext(360, 5),
  1355. OnNext(370, 6),
  1356. OnNext(390, 7),
  1357. OnNext(410, 13),
  1358. OnNext(430, 2),
  1359. OnNext(450, 9),
  1360. OnNext(520, 11),
  1361. OnNext(560, 20),
  1362. OnCompleted<int>(600)
  1363. );
  1364. var res = scheduler.Start(() =>
  1365. xs.PublishLast(_xs => _xs.Zip(_xs, (x, y) => x + y)),
  1366. 470
  1367. );
  1368. res.Messages.AssertEqual(
  1369. );
  1370. xs.Subscriptions.AssertEqual(
  1371. Subscribe(200, 470)
  1372. );
  1373. }
  1374. #endregion
  1375. #region RefCount
  1376. [Fact]
  1377. public void RefCount_ArgumentChecking()
  1378. {
  1379. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null));
  1380. }
  1381. [Fact]
  1382. public void RefCount_ConnectsOnFirst()
  1383. {
  1384. var scheduler = new TestScheduler();
  1385. var xs = scheduler.CreateHotObservable<int>(
  1386. OnNext(210, 1),
  1387. OnNext(220, 2),
  1388. OnNext(230, 3),
  1389. OnNext(240, 4),
  1390. OnCompleted<int>(250)
  1391. );
  1392. var subject = new MySubject();
  1393. var conn = new ConnectableObservable<int>(xs, subject);
  1394. var res = scheduler.Start(() =>
  1395. conn.RefCount()
  1396. );
  1397. res.Messages.AssertEqual(
  1398. OnNext(210, 1),
  1399. OnNext(220, 2),
  1400. OnNext(230, 3),
  1401. OnNext(240, 4),
  1402. OnCompleted<int>(250)
  1403. );
  1404. Assert.True(subject.Disposed);
  1405. }
  1406. [Fact]
  1407. public void RefCount_NotConnected()
  1408. {
  1409. var disconnected = false;
  1410. var count = 0;
  1411. var xs = Observable.Defer(() =>
  1412. {
  1413. count++;
  1414. return Observable.Create<int>(obs =>
  1415. {
  1416. return () => { disconnected = true; };
  1417. });
  1418. });
  1419. var subject = new MySubject();
  1420. var conn = new ConnectableObservable<int>(xs, subject);
  1421. var refd = conn.RefCount();
  1422. var dis1 = refd.Subscribe();
  1423. Assert.Equal(1, count);
  1424. Assert.Equal(1, subject.SubscribeCount);
  1425. Assert.False(disconnected);
  1426. var dis2 = refd.Subscribe();
  1427. Assert.Equal(1, count);
  1428. Assert.Equal(2, subject.SubscribeCount);
  1429. Assert.False(disconnected);
  1430. dis1.Dispose();
  1431. Assert.False(disconnected);
  1432. dis2.Dispose();
  1433. Assert.True(disconnected);
  1434. disconnected = false;
  1435. var dis3 = refd.Subscribe();
  1436. Assert.Equal(2, count);
  1437. Assert.Equal(3, subject.SubscribeCount);
  1438. Assert.False(disconnected);
  1439. dis3.Dispose();
  1440. Assert.True(disconnected);
  1441. }
  1442. [Fact]
  1443. public void RefCount_OnError()
  1444. {
  1445. var ex = new Exception();
  1446. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  1447. var res = xs.Publish().RefCount();
  1448. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  1449. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  1450. }
  1451. [Fact]
  1452. public void RefCount_Publish()
  1453. {
  1454. var scheduler = new TestScheduler();
  1455. var xs = scheduler.CreateHotObservable<int>(
  1456. OnNext(210, 1),
  1457. OnNext(220, 2),
  1458. OnNext(230, 3),
  1459. OnNext(240, 4),
  1460. OnNext(250, 5),
  1461. OnNext(260, 6),
  1462. OnNext(270, 7),
  1463. OnNext(280, 8),
  1464. OnNext(290, 9),
  1465. OnCompleted<int>(300)
  1466. );
  1467. var res = xs.Publish().RefCount();
  1468. var d1 = default(IDisposable);
  1469. var o1 = scheduler.CreateObserver<int>();
  1470. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  1471. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  1472. var d2 = default(IDisposable);
  1473. var o2 = scheduler.CreateObserver<int>();
  1474. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  1475. scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); });
  1476. var d3 = default(IDisposable);
  1477. var o3 = scheduler.CreateObserver<int>();
  1478. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  1479. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  1480. var d4 = default(IDisposable);
  1481. var o4 = scheduler.CreateObserver<int>();
  1482. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  1483. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  1484. scheduler.Start();
  1485. o1.Messages.AssertEqual(
  1486. OnNext(220, 2),
  1487. OnNext(230, 3)
  1488. );
  1489. o2.Messages.AssertEqual(
  1490. OnNext(230, 3),
  1491. OnNext(240, 4),
  1492. OnNext(250, 5),
  1493. OnNext(260, 6),
  1494. OnNext(270, 7)
  1495. );
  1496. o3.Messages.AssertEqual(
  1497. OnNext(260, 6)
  1498. );
  1499. o4.Messages.AssertEqual(
  1500. OnNext(290, 9),
  1501. OnCompleted<int>(300)
  1502. );
  1503. xs.Subscriptions.AssertEqual(
  1504. Subscribe(215, 275),
  1505. Subscribe(285, 300)
  1506. );
  1507. }
  1508. #endregion
  1509. #region Replay
  1510. [Fact]
  1511. public void Replay_ArgumentChecking()
  1512. {
  1513. var someObservable = Observable.Empty<int>();
  1514. var scheduler = new TestScheduler();
  1515. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>)));
  1516. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x));
  1517. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null));
  1518. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int>(null, DummyScheduler.Instance));
  1519. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int>(DummyObservable<int>.Instance, (IScheduler)null));
  1520. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(null, DummyFunc<IObservable<int>, IObservable<int>>.Instance, DummyScheduler.Instance));
  1521. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(DummyObservable<int>.Instance, null, DummyScheduler.Instance));
  1522. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(DummyObservable<int>.Instance, DummyFunc<IObservable<int>, IObservable<int>>.Instance, null));
  1523. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  1524. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1)));
  1525. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1)));
  1526. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1)));
  1527. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay<int, int>(someObservable, x => x, TimeSpan.FromSeconds(-1)));
  1528. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1), scheduler));
  1529. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  1530. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(1), default(IScheduler)));
  1531. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1), scheduler));
  1532. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1), scheduler));
  1533. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(-1), scheduler));
  1534. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(1), default(IScheduler)));
  1535. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, scheduler));
  1536. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, scheduler));
  1537. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, default(IScheduler)));
  1538. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, scheduler));
  1539. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, -2, scheduler));
  1540. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, scheduler));
  1541. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, default(IScheduler)));
  1542. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1));
  1543. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2));
  1544. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1));
  1545. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1));
  1546. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2));
  1547. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1)));
  1548. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1)));
  1549. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1)));
  1550. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1)));
  1551. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1)));
  1552. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1)));
  1553. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1)));
  1554. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1), scheduler));
  1555. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1), scheduler));
  1556. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1), scheduler));
  1557. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(1), null));
  1558. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1), scheduler));
  1559. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1), scheduler));
  1560. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1), scheduler));
  1561. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1), scheduler));
  1562. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(1), null));
  1563. }
  1564. [Fact]
  1565. public void ReplayCount_Basic()
  1566. {
  1567. var scheduler = new TestScheduler();
  1568. var xs = scheduler.CreateHotObservable(
  1569. OnNext(110, 7),
  1570. OnNext(220, 3),
  1571. OnNext(280, 4),
  1572. OnNext(290, 1),
  1573. OnNext(340, 8),
  1574. OnNext(360, 5),
  1575. OnNext(370, 6),
  1576. OnNext(390, 7),
  1577. OnNext(410, 13),
  1578. OnNext(430, 2),
  1579. OnNext(450, 9),
  1580. OnNext(520, 11),
  1581. OnNext(560, 20),
  1582. OnCompleted<int>(600)
  1583. );
  1584. var ys = default(IConnectableObservable<int>);
  1585. var subscription = default(IDisposable);
  1586. var connection = default(IDisposable);
  1587. var res = scheduler.CreateObserver<int>();
  1588. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  1589. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1590. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1591. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1592. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1593. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1594. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1595. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1596. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1597. scheduler.Start();
  1598. res.Messages.AssertEqual(
  1599. OnNext(451, 5),
  1600. OnNext(452, 6),
  1601. OnNext(453, 7),
  1602. OnNext(521, 11)
  1603. );
  1604. xs.Subscriptions.AssertEqual(
  1605. Subscribe(300, 400),
  1606. Subscribe(500, 550),
  1607. Subscribe(650, 800)
  1608. );
  1609. }
  1610. [Fact]
  1611. public void ReplayCount_Error()
  1612. {
  1613. var scheduler = new TestScheduler();
  1614. var ex = new Exception();
  1615. var xs = scheduler.CreateHotObservable(
  1616. OnNext(110, 7),
  1617. OnNext(220, 3),
  1618. OnNext(280, 4),
  1619. OnNext(290, 1),
  1620. OnNext(340, 8),
  1621. OnNext(360, 5),
  1622. OnNext(370, 6),
  1623. OnNext(390, 7),
  1624. OnNext(410, 13),
  1625. OnNext(430, 2),
  1626. OnNext(450, 9),
  1627. OnNext(520, 11),
  1628. OnNext(560, 20),
  1629. OnError<int>(600, ex)
  1630. );
  1631. var ys = default(IConnectableObservable<int>);
  1632. var subscription = default(IDisposable);
  1633. var connection = default(IDisposable);
  1634. var res = scheduler.CreateObserver<int>();
  1635. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  1636. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1637. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1638. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1639. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1640. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1641. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1642. scheduler.Start();
  1643. res.Messages.AssertEqual(
  1644. OnNext(451, 5),
  1645. OnNext(452, 6),
  1646. OnNext(453, 7),
  1647. OnNext(521, 11),
  1648. OnNext(561, 20),
  1649. OnError<int>(601, ex)
  1650. );
  1651. xs.Subscriptions.AssertEqual(
  1652. Subscribe(300, 400),
  1653. Subscribe(500, 600)
  1654. );
  1655. }
  1656. [Fact]
  1657. public void ReplayCount_Complete()
  1658. {
  1659. var scheduler = new TestScheduler();
  1660. var xs = scheduler.CreateHotObservable(
  1661. OnNext(110, 7),
  1662. OnNext(220, 3),
  1663. OnNext(280, 4),
  1664. OnNext(290, 1),
  1665. OnNext(340, 8),
  1666. OnNext(360, 5),
  1667. OnNext(370, 6),
  1668. OnNext(390, 7),
  1669. OnNext(410, 13),
  1670. OnNext(430, 2),
  1671. OnNext(450, 9),
  1672. OnNext(520, 11),
  1673. OnNext(560, 20),
  1674. OnCompleted<int>(600)
  1675. );
  1676. var ys = default(IConnectableObservable<int>);
  1677. var subscription = default(IDisposable);
  1678. var connection = default(IDisposable);
  1679. var res = scheduler.CreateObserver<int>();
  1680. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  1681. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1682. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1683. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1684. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1685. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1686. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1687. scheduler.Start();
  1688. res.Messages.AssertEqual(
  1689. OnNext(451, 5),
  1690. OnNext(452, 6),
  1691. OnNext(453, 7),
  1692. OnNext(521, 11),
  1693. OnNext(561, 20),
  1694. OnCompleted<int>(601)
  1695. );
  1696. xs.Subscriptions.AssertEqual(
  1697. Subscribe(300, 400),
  1698. Subscribe(500, 600)
  1699. );
  1700. }
  1701. [Fact]
  1702. public void ReplayCount_Dispose()
  1703. {
  1704. var scheduler = new TestScheduler();
  1705. var xs = scheduler.CreateHotObservable(
  1706. OnNext(110, 7),
  1707. OnNext(220, 3),
  1708. OnNext(280, 4),
  1709. OnNext(290, 1),
  1710. OnNext(340, 8),
  1711. OnNext(360, 5),
  1712. OnNext(370, 6),
  1713. OnNext(390, 7),
  1714. OnNext(410, 13),
  1715. OnNext(430, 2),
  1716. OnNext(450, 9),
  1717. OnNext(520, 11),
  1718. OnNext(560, 20),
  1719. OnCompleted<int>(600)
  1720. );
  1721. var ys = default(IConnectableObservable<int>);
  1722. var subscription = default(IDisposable);
  1723. var connection = default(IDisposable);
  1724. var res = scheduler.CreateObserver<int>();
  1725. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  1726. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1727. scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
  1728. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1729. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1730. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1731. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1732. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1733. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1734. scheduler.Start();
  1735. res.Messages.AssertEqual(
  1736. OnNext(451, 5),
  1737. OnNext(452, 6),
  1738. OnNext(453, 7)
  1739. );
  1740. xs.Subscriptions.AssertEqual(
  1741. Subscribe(300, 400),
  1742. Subscribe(500, 550),
  1743. Subscribe(650, 800)
  1744. );
  1745. }
  1746. [Fact]
  1747. public void ReplayCount_MultipleConnections()
  1748. {
  1749. var xs = Observable.Never<int>();
  1750. var ys = xs.Replay(3, new TestScheduler());
  1751. var connection1 = ys.Connect();
  1752. var connection2 = ys.Connect();
  1753. Assert.Same(connection1, connection2);
  1754. connection1.Dispose();
  1755. connection2.Dispose();
  1756. var connection3 = ys.Connect();
  1757. Assert.NotSame(connection1, connection3);
  1758. connection3.Dispose();
  1759. }
  1760. [Fact]
  1761. public void ReplayCountLambda_Zip_Complete()
  1762. {
  1763. var scheduler = new TestScheduler();
  1764. var xs = scheduler.CreateHotObservable(
  1765. OnNext(110, 7),
  1766. OnNext(220, 3),
  1767. OnNext(280, 4),
  1768. OnNext(290, 1),
  1769. OnNext(340, 8),
  1770. OnNext(360, 5),
  1771. OnNext(370, 6),
  1772. OnNext(390, 7),
  1773. OnNext(410, 13),
  1774. OnNext(430, 2),
  1775. OnNext(450, 9),
  1776. OnNext(520, 11),
  1777. OnNext(560, 20),
  1778. OnCompleted<int>(600)
  1779. );
  1780. var res = scheduler.Start(() =>
  1781. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
  1782. 610
  1783. );
  1784. res.Messages.AssertEqual(
  1785. OnNext(221, 3),
  1786. OnNext(281, 4),
  1787. OnNext(291, 1),
  1788. OnNext(341, 8),
  1789. OnNext(361, 5),
  1790. OnNext(371, 6),
  1791. OnNext(372, 8),
  1792. OnNext(373, 5),
  1793. OnNext(374, 6),
  1794. OnNext(391, 7),
  1795. OnNext(411, 13),
  1796. OnNext(431, 2),
  1797. OnNext(432, 7),
  1798. OnNext(433, 13),
  1799. OnNext(434, 2),
  1800. OnNext(451, 9),
  1801. OnNext(521, 11),
  1802. OnNext(561, 20),
  1803. OnNext(562, 9),
  1804. OnNext(563, 11),
  1805. OnNext(564, 20),
  1806. OnNext(602, 9),
  1807. OnNext(603, 11),
  1808. OnNext(604, 20),
  1809. OnNext(606, 9),
  1810. OnNext(607, 11),
  1811. OnNext(608, 20)
  1812. );
  1813. xs.Subscriptions.AssertEqual(
  1814. Subscribe(200, 600)
  1815. );
  1816. }
  1817. [Fact]
  1818. public void ReplayCountLambda_Zip_Error()
  1819. {
  1820. var scheduler = new TestScheduler();
  1821. var ex = new Exception();
  1822. var xs = scheduler.CreateHotObservable(
  1823. OnNext(110, 7),
  1824. OnNext(220, 3),
  1825. OnNext(280, 4),
  1826. OnNext(290, 1),
  1827. OnNext(340, 8),
  1828. OnNext(360, 5),
  1829. OnNext(370, 6),
  1830. OnNext(390, 7),
  1831. OnNext(410, 13),
  1832. OnNext(430, 2),
  1833. OnNext(450, 9),
  1834. OnNext(520, 11),
  1835. OnNext(560, 20),
  1836. OnError<int>(600, ex)
  1837. );
  1838. var res = scheduler.Start(() =>
  1839. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler)
  1840. );
  1841. res.Messages.AssertEqual(
  1842. OnNext(221, 3),
  1843. OnNext(281, 4),
  1844. OnNext(291, 1),
  1845. OnNext(341, 8),
  1846. OnNext(361, 5),
  1847. OnNext(371, 6),
  1848. OnNext(372, 8),
  1849. OnNext(373, 5),
  1850. OnNext(374, 6),
  1851. OnNext(391, 7),
  1852. OnNext(411, 13),
  1853. OnNext(431, 2),
  1854. OnNext(432, 7),
  1855. OnNext(433, 13),
  1856. OnNext(434, 2),
  1857. OnNext(451, 9),
  1858. OnNext(521, 11),
  1859. OnNext(561, 20),
  1860. OnNext(562, 9),
  1861. OnNext(563, 11),
  1862. OnNext(564, 20),
  1863. OnError<int>(601, ex)
  1864. );
  1865. xs.Subscriptions.AssertEqual(
  1866. Subscribe(200, 600)
  1867. );
  1868. }
  1869. [Fact]
  1870. public void ReplayCountLambda_Zip_Dispose()
  1871. {
  1872. var scheduler = new TestScheduler();
  1873. var xs = scheduler.CreateHotObservable(
  1874. OnNext(110, 7),
  1875. OnNext(220, 3),
  1876. OnNext(280, 4),
  1877. OnNext(290, 1),
  1878. OnNext(340, 8),
  1879. OnNext(360, 5),
  1880. OnNext(370, 6),
  1881. OnNext(390, 7),
  1882. OnNext(410, 13),
  1883. OnNext(430, 2),
  1884. OnNext(450, 9),
  1885. OnNext(520, 11),
  1886. OnNext(560, 20),
  1887. OnCompleted<int>(600)
  1888. );
  1889. var res = scheduler.Start(() =>
  1890. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
  1891. 470
  1892. );
  1893. res.Messages.AssertEqual(
  1894. OnNext(221, 3),
  1895. OnNext(281, 4),
  1896. OnNext(291, 1),
  1897. OnNext(341, 8),
  1898. OnNext(361, 5),
  1899. OnNext(371, 6),
  1900. OnNext(372, 8),
  1901. OnNext(373, 5),
  1902. OnNext(374, 6),
  1903. OnNext(391, 7),
  1904. OnNext(411, 13),
  1905. OnNext(431, 2),
  1906. OnNext(432, 7),
  1907. OnNext(433, 13),
  1908. OnNext(434, 2),
  1909. OnNext(451, 9)
  1910. );
  1911. xs.Subscriptions.AssertEqual(
  1912. Subscribe(200, 470)
  1913. );
  1914. }
  1915. [Fact]
  1916. public void ReplayTime_Basic()
  1917. {
  1918. var scheduler = new TestScheduler();
  1919. var xs = scheduler.CreateHotObservable(
  1920. OnNext(110, 7),
  1921. OnNext(220, 3),
  1922. OnNext(280, 4),
  1923. OnNext(290, 1),
  1924. OnNext(340, 8),
  1925. OnNext(360, 5),
  1926. OnNext(370, 6),
  1927. OnNext(390, 7),
  1928. OnNext(410, 13),
  1929. OnNext(430, 2),
  1930. OnNext(450, 9),
  1931. OnNext(520, 11),
  1932. OnNext(560, 20),
  1933. OnCompleted<int>(600)
  1934. );
  1935. var ys = default(IConnectableObservable<int>);
  1936. var subscription = default(IDisposable);
  1937. var connection = default(IDisposable);
  1938. var res = scheduler.CreateObserver<int>();
  1939. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(150), scheduler));
  1940. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1941. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1942. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1943. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1944. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1945. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1946. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1947. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1948. scheduler.Start();
  1949. res.Messages.AssertEqual(
  1950. OnNext(451, 8),
  1951. OnNext(452, 5),
  1952. OnNext(453, 6),
  1953. OnNext(454, 7),
  1954. OnNext(521, 11)
  1955. );
  1956. xs.Subscriptions.AssertEqual(
  1957. Subscribe(300, 400),
  1958. Subscribe(500, 550),
  1959. Subscribe(650, 800)
  1960. );
  1961. }
  1962. [Fact]
  1963. public void ReplayTime_Error()
  1964. {
  1965. var scheduler = new TestScheduler();
  1966. var ex = new Exception();
  1967. var xs = scheduler.CreateHotObservable(
  1968. OnNext(110, 7),
  1969. OnNext(220, 3),
  1970. OnNext(280, 4),
  1971. OnNext(290, 1),
  1972. OnNext(340, 8),
  1973. OnNext(360, 5),
  1974. OnNext(370, 6),
  1975. OnNext(390, 7),
  1976. OnNext(410, 13),
  1977. OnNext(430, 2),
  1978. OnNext(450, 9),
  1979. OnNext(520, 11),
  1980. OnNext(560, 20),
  1981. OnError<int>(600, ex)
  1982. );
  1983. var ys = default(IConnectableObservable<int>);
  1984. var subscription = default(IDisposable);
  1985. var connection = default(IDisposable);
  1986. var res = scheduler.CreateObserver<int>();
  1987. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(75), scheduler));
  1988. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1989. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1990. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1991. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1992. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1993. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1994. scheduler.Start();
  1995. res.Messages.AssertEqual(
  1996. OnNext(451, 7),
  1997. OnNext(521, 11),
  1998. OnNext(561, 20),
  1999. OnError<int>(601, ex)
  2000. );
  2001. xs.Subscriptions.AssertEqual(
  2002. Subscribe(300, 400),
  2003. Subscribe(500, 600)
  2004. );
  2005. }
  2006. [Fact]
  2007. public void ReplayTime_Complete()
  2008. {
  2009. var scheduler = new TestScheduler();
  2010. var xs = scheduler.CreateHotObservable(
  2011. OnNext(110, 7),
  2012. OnNext(220, 3),
  2013. OnNext(280, 4),
  2014. OnNext(290, 1),
  2015. OnNext(340, 8),
  2016. OnNext(360, 5),
  2017. OnNext(370, 6),
  2018. OnNext(390, 7),
  2019. OnNext(410, 13),
  2020. OnNext(430, 2),
  2021. OnNext(450, 9),
  2022. OnNext(520, 11),
  2023. OnNext(560, 20),
  2024. OnCompleted<int>(600)
  2025. );
  2026. var ys = default(IConnectableObservable<int>);
  2027. var subscription = default(IDisposable);
  2028. var connection = default(IDisposable);
  2029. var res = scheduler.CreateObserver<int>();
  2030. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(85), scheduler));
  2031. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  2032. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  2033. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  2034. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  2035. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  2036. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  2037. scheduler.Start();
  2038. res.Messages.AssertEqual(
  2039. OnNext(451, 6),
  2040. OnNext(452, 7),
  2041. OnNext(521, 11),
  2042. OnNext(561, 20),
  2043. OnCompleted<int>(601)
  2044. );
  2045. xs.Subscriptions.AssertEqual(
  2046. Subscribe(300, 400),
  2047. Subscribe(500, 600)
  2048. );
  2049. }
  2050. [Fact]
  2051. public void ReplayTime_Dispose()
  2052. {
  2053. var scheduler = new TestScheduler();
  2054. var xs = scheduler.CreateHotObservable(
  2055. OnNext(110, 7),
  2056. OnNext(220, 3),
  2057. OnNext(280, 4),
  2058. OnNext(290, 1),
  2059. OnNext(340, 8),
  2060. OnNext(360, 5),
  2061. OnNext(370, 6),
  2062. OnNext(390, 7),
  2063. OnNext(410, 13),
  2064. OnNext(430, 2),
  2065. OnNext(450, 9),
  2066. OnNext(520, 11),
  2067. OnNext(560, 20),
  2068. OnCompleted<int>(600)
  2069. );
  2070. var ys = default(IConnectableObservable<int>);
  2071. var subscription = default(IDisposable);
  2072. var connection = default(IDisposable);
  2073. var res = scheduler.CreateObserver<int>();
  2074. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(100), scheduler));
  2075. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  2076. scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
  2077. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  2078. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  2079. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  2080. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  2081. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  2082. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  2083. scheduler.Start();
  2084. res.Messages.AssertEqual(
  2085. OnNext(451, 5),
  2086. OnNext(452, 6),
  2087. OnNext(453, 7)
  2088. );
  2089. xs.Subscriptions.AssertEqual(
  2090. Subscribe(300, 400),
  2091. Subscribe(500, 550),
  2092. Subscribe(650, 800)
  2093. );
  2094. }
  2095. [Fact]
  2096. public void ReplayTime_MultipleConnections()
  2097. {
  2098. var xs = Observable.Never<int>();
  2099. var ys = xs.Replay(TimeSpan.FromTicks(100), new TestScheduler());
  2100. var connection1 = ys.Connect();
  2101. var connection2 = ys.Connect();
  2102. Assert.Same(connection1, connection2);
  2103. connection1.Dispose();
  2104. connection2.Dispose();
  2105. var connection3 = ys.Connect();
  2106. Assert.NotSame(connection1, connection3);
  2107. connection3.Dispose();
  2108. }
  2109. [Fact]
  2110. public void ReplayTimeLambda_Zip_Complete()
  2111. {
  2112. var scheduler = new TestScheduler();
  2113. var xs = scheduler.CreateHotObservable(
  2114. OnNext(110, 7),
  2115. OnNext(220, 3),
  2116. OnNext(280, 4),
  2117. OnNext(290, 1),
  2118. OnNext(340, 8),
  2119. OnNext(360, 5),
  2120. OnNext(370, 6),
  2121. OnNext(390, 7),
  2122. OnNext(410, 13),
  2123. OnNext(430, 2),
  2124. OnNext(450, 9),
  2125. OnNext(520, 11),
  2126. OnNext(560, 20),
  2127. OnCompleted<int>(600)
  2128. );
  2129. var res = scheduler.Start(() =>
  2130. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
  2131. 610
  2132. );
  2133. res.Messages.AssertEqual(
  2134. OnNext(221, 3),
  2135. OnNext(281, 4),
  2136. OnNext(291, 1),
  2137. OnNext(341, 8),
  2138. OnNext(361, 5),
  2139. OnNext(371, 6),
  2140. OnNext(372, 8),
  2141. OnNext(373, 5),
  2142. OnNext(374, 6),
  2143. OnNext(391, 7),
  2144. OnNext(411, 13),
  2145. OnNext(431, 2),
  2146. OnNext(432, 7),
  2147. OnNext(433, 13),
  2148. OnNext(434, 2),
  2149. OnNext(451, 9),
  2150. OnNext(521, 11),
  2151. OnNext(561, 20),
  2152. OnNext(562, 11),
  2153. OnNext(563, 20),
  2154. OnNext(602, 20),
  2155. OnNext(604, 20),
  2156. OnNext(606, 20),
  2157. OnNext(608, 20)
  2158. );
  2159. xs.Subscriptions.AssertEqual(
  2160. Subscribe(200, 600)
  2161. );
  2162. }
  2163. [Fact]
  2164. public void ReplayTimeLambda_Zip_Error()
  2165. {
  2166. var scheduler = new TestScheduler();
  2167. var ex = new Exception();
  2168. var xs = scheduler.CreateHotObservable(
  2169. OnNext(110, 7),
  2170. OnNext(220, 3),
  2171. OnNext(280, 4),
  2172. OnNext(290, 1),
  2173. OnNext(340, 8),
  2174. OnNext(360, 5),
  2175. OnNext(370, 6),
  2176. OnNext(390, 7),
  2177. OnNext(410, 13),
  2178. OnNext(430, 2),
  2179. OnNext(450, 9),
  2180. OnNext(520, 11),
  2181. OnNext(560, 20),
  2182. OnError<int>(600, ex)
  2183. );
  2184. var res = scheduler.Start(() =>
  2185. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler)
  2186. );
  2187. res.Messages.AssertEqual(
  2188. OnNext(221, 3),
  2189. OnNext(281, 4),
  2190. OnNext(291, 1),
  2191. OnNext(341, 8),
  2192. OnNext(361, 5),
  2193. OnNext(371, 6),
  2194. OnNext(372, 8),
  2195. OnNext(373, 5),
  2196. OnNext(374, 6),
  2197. OnNext(391, 7),
  2198. OnNext(411, 13),
  2199. OnNext(431, 2),
  2200. OnNext(432, 7),
  2201. OnNext(433, 13),
  2202. OnNext(434, 2),
  2203. OnNext(451, 9),
  2204. OnNext(521, 11),
  2205. OnNext(561, 20),
  2206. OnNext(562, 11),
  2207. OnNext(563, 20),
  2208. OnError<int>(601, ex)
  2209. );
  2210. xs.Subscriptions.AssertEqual(
  2211. Subscribe(200, 600)
  2212. );
  2213. }
  2214. [Fact]
  2215. public void ReplayTimeLambda_Zip_Dispose()
  2216. {
  2217. var scheduler = new TestScheduler();
  2218. var xs = scheduler.CreateHotObservable(
  2219. OnNext(110, 7),
  2220. OnNext(220, 3),
  2221. OnNext(280, 4),
  2222. OnNext(290, 1),
  2223. OnNext(340, 8),
  2224. OnNext(360, 5),
  2225. OnNext(370, 6),
  2226. OnNext(390, 7),
  2227. OnNext(410, 13),
  2228. OnNext(430, 2),
  2229. OnNext(450, 9),
  2230. OnNext(520, 11),
  2231. OnNext(560, 20),
  2232. OnCompleted<int>(600)
  2233. );
  2234. var res = scheduler.Start(() =>
  2235. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
  2236. 470
  2237. );
  2238. res.Messages.AssertEqual(
  2239. OnNext(221, 3),
  2240. OnNext(281, 4),
  2241. OnNext(291, 1),
  2242. OnNext(341, 8),
  2243. OnNext(361, 5),
  2244. OnNext(371, 6),
  2245. OnNext(372, 8),
  2246. OnNext(373, 5),
  2247. OnNext(374, 6),
  2248. OnNext(391, 7),
  2249. OnNext(411, 13),
  2250. OnNext(431, 2),
  2251. OnNext(432, 7),
  2252. OnNext(433, 13),
  2253. OnNext(434, 2),
  2254. OnNext(451, 9)
  2255. );
  2256. xs.Subscriptions.AssertEqual(
  2257. Subscribe(200, 470)
  2258. );
  2259. }
  2260. [Fact]
  2261. public void Replay_Default1()
  2262. {
  2263. var s = new Subject<int>();
  2264. var xs = s.Replay(100, DefaultScheduler.Instance);
  2265. var ys = s.Replay(100);
  2266. xs.Connect();
  2267. ys.Connect();
  2268. s.OnNext(1);
  2269. s.OnNext(2);
  2270. s.OnCompleted();
  2271. xs.AssertEqual(ys);
  2272. }
  2273. [Fact]
  2274. public void Replay_Default2()
  2275. {
  2276. var s = new Subject<int>();
  2277. var xs = s.Replay(TimeSpan.FromHours(1), DefaultScheduler.Instance);
  2278. var ys = s.Replay(TimeSpan.FromHours(1));
  2279. xs.Connect();
  2280. ys.Connect();
  2281. s.OnNext(1);
  2282. s.OnNext(2);
  2283. s.OnCompleted();
  2284. xs.AssertEqual(ys);
  2285. }
  2286. [Fact]
  2287. public void Replay_Default3()
  2288. {
  2289. var s = new Subject<int>();
  2290. var xs = s.Replay(100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  2291. var ys = s.Replay(100, TimeSpan.FromHours(1));
  2292. xs.Connect();
  2293. ys.Connect();
  2294. s.OnNext(1);
  2295. s.OnNext(2);
  2296. s.OnCompleted();
  2297. xs.AssertEqual(ys);
  2298. }
  2299. [Fact]
  2300. public void Replay_Default4()
  2301. {
  2302. var s = new Subject<int>();
  2303. var xs = s.Replay(DefaultScheduler.Instance);
  2304. var ys = s.Replay();
  2305. xs.Connect();
  2306. ys.Connect();
  2307. s.OnNext(1);
  2308. s.OnNext(2);
  2309. s.OnCompleted();
  2310. xs.AssertEqual(ys);
  2311. }
  2312. [Fact]
  2313. public void ReplayLambda_Default1()
  2314. {
  2315. var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, DefaultScheduler.Instance);
  2316. var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100);
  2317. xs.AssertEqual(ys);
  2318. }
  2319. [Fact]
  2320. public void ReplayLambda_Default2()
  2321. {
  2322. var xs = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  2323. var ys = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1));
  2324. xs.AssertEqual(ys);
  2325. }
  2326. [Fact]
  2327. public void ReplayLambda_Default3()
  2328. {
  2329. var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  2330. var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1));
  2331. xs.AssertEqual(ys);
  2332. }
  2333. [Fact]
  2334. public void ReplayLambda_Default4()
  2335. {
  2336. var xs = Observable.Range(1, 10).Replay(_xs => _xs, DefaultScheduler.Instance);
  2337. var ys = Observable.Range(1, 10).Replay(_xs => _xs);
  2338. xs.AssertEqual(ys);
  2339. }
  2340. #endregion
  2341. }
  2342. }