PublishTest.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using Microsoft.Reactive.Testing;
  10. using Xunit;
  11. namespace ReactiveTests.Tests
  12. {
  13. public class PublishTest : ReactiveTest
  14. {
  15. [Fact]
  16. public void Publish_Cold_Zip()
  17. {
  18. var scheduler = new TestScheduler();
  19. var xs = scheduler.CreateHotObservable(
  20. OnNext(40, 0),
  21. OnNext(90, 1),
  22. OnNext(150, 2),
  23. OnNext(210, 3),
  24. OnNext(240, 4),
  25. OnNext(270, 5),
  26. OnNext(330, 6),
  27. OnNext(340, 7),
  28. OnCompleted<int>(390)
  29. );
  30. var res = scheduler.Start(() =>
  31. xs.Publish(ys => ys.Zip(ys, (a, b) => a + b))
  32. );
  33. res.Messages.AssertEqual(
  34. OnNext(210, 6),
  35. OnNext(240, 8),
  36. OnNext(270, 10),
  37. OnNext(330, 12),
  38. OnNext(340, 14),
  39. OnCompleted<int>(390)
  40. );
  41. xs.Subscriptions.AssertEqual(
  42. Subscribe(200, 390)
  43. );
  44. }
  45. [Fact]
  46. public void Publish_ArgumentChecking()
  47. {
  48. var someObservable = Observable.Empty<int>();
  49. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>)));
  50. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>), x => x));
  51. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish<int, int>(someObservable, null));
  52. }
  53. [Fact]
  54. public void Publish_Basic()
  55. {
  56. var scheduler = new TestScheduler();
  57. var xs = scheduler.CreateHotObservable(
  58. OnNext(110, 7),
  59. OnNext(220, 3),
  60. OnNext(280, 4),
  61. OnNext(290, 1),
  62. OnNext(340, 8),
  63. OnNext(360, 5),
  64. OnNext(370, 6),
  65. OnNext(390, 7),
  66. OnNext(410, 13),
  67. OnNext(430, 2),
  68. OnNext(450, 9),
  69. OnNext(520, 11),
  70. OnNext(560, 20),
  71. OnCompleted<int>(600)
  72. );
  73. var ys = default(IConnectableObservable<int>);
  74. var subscription = default(IDisposable);
  75. var connection = default(IDisposable);
  76. var res = scheduler.CreateObserver<int>();
  77. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  78. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  79. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  80. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  81. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  82. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  83. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  84. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  85. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  86. scheduler.Start();
  87. res.Messages.AssertEqual(
  88. OnNext(340, 8),
  89. OnNext(360, 5),
  90. OnNext(370, 6),
  91. OnNext(390, 7),
  92. OnNext(520, 11)
  93. );
  94. xs.Subscriptions.AssertEqual(
  95. Subscribe(300, 400),
  96. Subscribe(500, 550),
  97. Subscribe(650, 800)
  98. );
  99. }
  100. [Fact]
  101. public void Publish_Error()
  102. {
  103. var scheduler = new TestScheduler();
  104. var ex = new Exception();
  105. var xs = scheduler.CreateHotObservable(
  106. OnNext(110, 7),
  107. OnNext(220, 3),
  108. OnNext(280, 4),
  109. OnNext(290, 1),
  110. OnNext(340, 8),
  111. OnNext(360, 5),
  112. OnNext(370, 6),
  113. OnNext(390, 7),
  114. OnNext(410, 13),
  115. OnNext(430, 2),
  116. OnNext(450, 9),
  117. OnNext(520, 11),
  118. OnNext(560, 20),
  119. OnError<int>(600, ex)
  120. );
  121. var ys = default(IConnectableObservable<int>);
  122. var subscription = default(IDisposable);
  123. var connection = default(IDisposable);
  124. var res = scheduler.CreateObserver<int>();
  125. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  126. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  127. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  128. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  129. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  130. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  131. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  132. scheduler.Start();
  133. res.Messages.AssertEqual(
  134. OnNext(340, 8),
  135. OnNext(360, 5),
  136. OnNext(370, 6),
  137. OnNext(390, 7),
  138. OnNext(520, 11),
  139. OnNext(560, 20),
  140. OnError<int>(600, ex)
  141. );
  142. xs.Subscriptions.AssertEqual(
  143. Subscribe(300, 400),
  144. Subscribe(500, 600)
  145. );
  146. }
  147. [Fact]
  148. public void Publish_Complete()
  149. {
  150. var scheduler = new TestScheduler();
  151. var xs = scheduler.CreateHotObservable(
  152. OnNext(110, 7),
  153. OnNext(220, 3),
  154. OnNext(280, 4),
  155. OnNext(290, 1),
  156. OnNext(340, 8),
  157. OnNext(360, 5),
  158. OnNext(370, 6),
  159. OnNext(390, 7),
  160. OnNext(410, 13),
  161. OnNext(430, 2),
  162. OnNext(450, 9),
  163. OnNext(520, 11),
  164. OnNext(560, 20),
  165. OnCompleted<int>(600)
  166. );
  167. var ys = default(IConnectableObservable<int>);
  168. var subscription = default(IDisposable);
  169. var connection = default(IDisposable);
  170. var res = scheduler.CreateObserver<int>();
  171. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  172. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  173. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  174. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  175. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  176. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  177. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  178. scheduler.Start();
  179. res.Messages.AssertEqual(
  180. OnNext(340, 8),
  181. OnNext(360, 5),
  182. OnNext(370, 6),
  183. OnNext(390, 7),
  184. OnNext(520, 11),
  185. OnNext(560, 20),
  186. OnCompleted<int>(600)
  187. );
  188. xs.Subscriptions.AssertEqual(
  189. Subscribe(300, 400),
  190. Subscribe(500, 600)
  191. );
  192. }
  193. [Fact]
  194. public void Publish_Dispose()
  195. {
  196. var scheduler = new TestScheduler();
  197. var xs = scheduler.CreateHotObservable(
  198. OnNext(110, 7),
  199. OnNext(220, 3),
  200. OnNext(280, 4),
  201. OnNext(290, 1),
  202. OnNext(340, 8),
  203. OnNext(360, 5),
  204. OnNext(370, 6),
  205. OnNext(390, 7),
  206. OnNext(410, 13),
  207. OnNext(430, 2),
  208. OnNext(450, 9),
  209. OnNext(520, 11),
  210. OnNext(560, 20),
  211. OnCompleted<int>(600)
  212. );
  213. var ys = default(IConnectableObservable<int>);
  214. var subscription = default(IDisposable);
  215. var connection = default(IDisposable);
  216. var res = scheduler.CreateObserver<int>();
  217. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  218. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  219. scheduler.ScheduleAbsolute(350, () => subscription.Dispose());
  220. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  221. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  222. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  223. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  224. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  225. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  226. scheduler.Start();
  227. res.Messages.AssertEqual(
  228. OnNext(340, 8)
  229. );
  230. xs.Subscriptions.AssertEqual(
  231. Subscribe(300, 400),
  232. Subscribe(500, 550),
  233. Subscribe(650, 800)
  234. );
  235. }
  236. [Fact]
  237. public void Publish_MultipleConnections()
  238. {
  239. var xs = Observable.Never<int>();
  240. var ys = xs.Publish();
  241. var connection1 = ys.Connect();
  242. var connection2 = ys.Connect();
  243. Assert.Same(connection1, connection2);
  244. connection1.Dispose();
  245. connection2.Dispose();
  246. var connection3 = ys.Connect();
  247. Assert.NotSame(connection1, connection3);
  248. connection3.Dispose();
  249. }
  250. [Fact]
  251. public void PublishLambda_Zip_Complete()
  252. {
  253. var scheduler = new TestScheduler();
  254. var xs = scheduler.CreateHotObservable(
  255. OnNext(110, 7),
  256. OnNext(220, 3),
  257. OnNext(280, 4),
  258. OnNext(290, 1),
  259. OnNext(340, 8),
  260. OnNext(360, 5),
  261. OnNext(370, 6),
  262. OnNext(390, 7),
  263. OnNext(410, 13),
  264. OnNext(430, 2),
  265. OnNext(450, 9),
  266. OnNext(520, 11),
  267. OnNext(560, 20),
  268. OnCompleted<int>(600)
  269. );
  270. var res = scheduler.Start(() =>
  271. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev))
  272. );
  273. res.Messages.AssertEqual(
  274. OnNext(280, 7),
  275. OnNext(290, 5),
  276. OnNext(340, 9),
  277. OnNext(360, 13),
  278. OnNext(370, 11),
  279. OnNext(390, 13),
  280. OnNext(410, 20),
  281. OnNext(430, 15),
  282. OnNext(450, 11),
  283. OnNext(520, 20),
  284. OnNext(560, 31),
  285. OnCompleted<int>(600)
  286. );
  287. xs.Subscriptions.AssertEqual(
  288. Subscribe(200, 600)
  289. );
  290. }
  291. [Fact]
  292. public void PublishLambda_Zip_Error()
  293. {
  294. var scheduler = new TestScheduler();
  295. var ex = new Exception();
  296. var xs = scheduler.CreateHotObservable(
  297. OnNext(110, 7),
  298. OnNext(220, 3),
  299. OnNext(280, 4),
  300. OnNext(290, 1),
  301. OnNext(340, 8),
  302. OnNext(360, 5),
  303. OnNext(370, 6),
  304. OnNext(390, 7),
  305. OnNext(410, 13),
  306. OnNext(430, 2),
  307. OnNext(450, 9),
  308. OnNext(520, 11),
  309. OnNext(560, 20),
  310. OnError<int>(600, ex)
  311. );
  312. var res = scheduler.Start(() =>
  313. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev))
  314. );
  315. res.Messages.AssertEqual(
  316. OnNext(280, 7),
  317. OnNext(290, 5),
  318. OnNext(340, 9),
  319. OnNext(360, 13),
  320. OnNext(370, 11),
  321. OnNext(390, 13),
  322. OnNext(410, 20),
  323. OnNext(430, 15),
  324. OnNext(450, 11),
  325. OnNext(520, 20),
  326. OnNext(560, 31),
  327. OnError<int>(600, ex)
  328. );
  329. xs.Subscriptions.AssertEqual(
  330. Subscribe(200, 600)
  331. );
  332. }
  333. [Fact]
  334. public void PublishLambda_Zip_Dispose()
  335. {
  336. var scheduler = new TestScheduler();
  337. var xs = scheduler.CreateHotObservable(
  338. OnNext(110, 7),
  339. OnNext(220, 3),
  340. OnNext(280, 4),
  341. OnNext(290, 1),
  342. OnNext(340, 8),
  343. OnNext(360, 5),
  344. OnNext(370, 6),
  345. OnNext(390, 7),
  346. OnNext(410, 13),
  347. OnNext(430, 2),
  348. OnNext(450, 9),
  349. OnNext(520, 11),
  350. OnNext(560, 20),
  351. OnCompleted<int>(600)
  352. );
  353. var res = scheduler.Start(() =>
  354. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev)),
  355. 470
  356. );
  357. res.Messages.AssertEqual(
  358. OnNext(280, 7),
  359. OnNext(290, 5),
  360. OnNext(340, 9),
  361. OnNext(360, 13),
  362. OnNext(370, 11),
  363. OnNext(390, 13),
  364. OnNext(410, 20),
  365. OnNext(430, 15),
  366. OnNext(450, 11)
  367. );
  368. xs.Subscriptions.AssertEqual(
  369. Subscribe(200, 470)
  370. );
  371. }
  372. [Fact]
  373. public void PublishWithInitialValue_ArgumentChecking()
  374. {
  375. var someObservable = Observable.Empty<int>();
  376. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default, 1));
  377. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default, x => x, 1));
  378. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(someObservable, default(Func<IObservable<int>, IObservable<int>>), 1));
  379. }
  380. [Fact]
  381. public void PublishWithInitialValue_SanityCheck()
  382. {
  383. var someObservable = Observable.Empty<int>();
  384. Observable.Publish(Observable.Range(1, 10), x => x, 0).AssertEqual(Observable.Range(0, 11));
  385. }
  386. [Fact]
  387. public void PublishWithInitialValue_Basic()
  388. {
  389. var scheduler = new TestScheduler();
  390. var xs = scheduler.CreateHotObservable(
  391. OnNext(110, 7),
  392. OnNext(220, 3),
  393. OnNext(280, 4),
  394. OnNext(290, 1),
  395. OnNext(340, 8),
  396. OnNext(360, 5),
  397. OnNext(370, 6),
  398. OnNext(390, 7),
  399. OnNext(410, 13),
  400. OnNext(430, 2),
  401. OnNext(450, 9),
  402. OnNext(520, 11),
  403. OnNext(560, 20),
  404. OnCompleted<int>(600)
  405. );
  406. var ys = default(IConnectableObservable<int>);
  407. var subscription = default(IDisposable);
  408. var connection = default(IDisposable);
  409. var res = scheduler.CreateObserver<int>();
  410. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  411. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  412. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  413. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  414. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  415. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  416. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  417. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  418. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  419. scheduler.Start();
  420. res.Messages.AssertEqual(
  421. OnNext(200, 1979),
  422. OnNext(340, 8),
  423. OnNext(360, 5),
  424. OnNext(370, 6),
  425. OnNext(390, 7),
  426. OnNext(520, 11)
  427. );
  428. xs.Subscriptions.AssertEqual(
  429. Subscribe(300, 400),
  430. Subscribe(500, 550),
  431. Subscribe(650, 800)
  432. );
  433. }
  434. [Fact]
  435. public void PublishWithInitialValue_Error()
  436. {
  437. var scheduler = new TestScheduler();
  438. var ex = new Exception();
  439. var xs = scheduler.CreateHotObservable(
  440. OnNext(110, 7),
  441. OnNext(220, 3),
  442. OnNext(280, 4),
  443. OnNext(290, 1),
  444. OnNext(340, 8),
  445. OnNext(360, 5),
  446. OnNext(370, 6),
  447. OnNext(390, 7),
  448. OnNext(410, 13),
  449. OnNext(430, 2),
  450. OnNext(450, 9),
  451. OnNext(520, 11),
  452. OnNext(560, 20),
  453. OnError<int>(600, ex)
  454. );
  455. var ys = default(IConnectableObservable<int>);
  456. var subscription = default(IDisposable);
  457. var connection = default(IDisposable);
  458. var res = scheduler.CreateObserver<int>();
  459. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  460. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  461. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  462. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  463. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  464. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  465. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  466. scheduler.Start();
  467. res.Messages.AssertEqual(
  468. OnNext(200, 1979),
  469. OnNext(340, 8),
  470. OnNext(360, 5),
  471. OnNext(370, 6),
  472. OnNext(390, 7),
  473. OnNext(520, 11),
  474. OnNext(560, 20),
  475. OnError<int>(600, ex)
  476. );
  477. xs.Subscriptions.AssertEqual(
  478. Subscribe(300, 400),
  479. Subscribe(500, 600)
  480. );
  481. }
  482. [Fact]
  483. public void PublishWithInitialValue_Complete()
  484. {
  485. var scheduler = new TestScheduler();
  486. var xs = scheduler.CreateHotObservable(
  487. OnNext(110, 7),
  488. OnNext(220, 3),
  489. OnNext(280, 4),
  490. OnNext(290, 1),
  491. OnNext(340, 8),
  492. OnNext(360, 5),
  493. OnNext(370, 6),
  494. OnNext(390, 7),
  495. OnNext(410, 13),
  496. OnNext(430, 2),
  497. OnNext(450, 9),
  498. OnNext(520, 11),
  499. OnNext(560, 20),
  500. OnCompleted<int>(600)
  501. );
  502. var ys = default(IConnectableObservable<int>);
  503. var subscription = default(IDisposable);
  504. var connection = default(IDisposable);
  505. var res = scheduler.CreateObserver<int>();
  506. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  507. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  508. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  509. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  510. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  511. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  512. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  513. scheduler.Start();
  514. res.Messages.AssertEqual(
  515. OnNext(200, 1979),
  516. OnNext(340, 8),
  517. OnNext(360, 5),
  518. OnNext(370, 6),
  519. OnNext(390, 7),
  520. OnNext(520, 11),
  521. OnNext(560, 20),
  522. OnCompleted<int>(600)
  523. );
  524. xs.Subscriptions.AssertEqual(
  525. Subscribe(300, 400),
  526. Subscribe(500, 600)
  527. );
  528. }
  529. [Fact]
  530. public void PublishWithInitialValue_Dispose()
  531. {
  532. var scheduler = new TestScheduler();
  533. var xs = scheduler.CreateHotObservable(
  534. OnNext(110, 7),
  535. OnNext(220, 3),
  536. OnNext(280, 4),
  537. OnNext(290, 1),
  538. OnNext(340, 8),
  539. OnNext(360, 5),
  540. OnNext(370, 6),
  541. OnNext(390, 7),
  542. OnNext(410, 13),
  543. OnNext(430, 2),
  544. OnNext(450, 9),
  545. OnNext(520, 11),
  546. OnNext(560, 20),
  547. OnCompleted<int>(600)
  548. );
  549. var ys = default(IConnectableObservable<int>);
  550. var subscription = default(IDisposable);
  551. var connection = default(IDisposable);
  552. var res = scheduler.CreateObserver<int>();
  553. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  554. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  555. scheduler.ScheduleAbsolute(350, () => subscription.Dispose());
  556. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  557. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  558. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  559. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  560. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  561. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  562. scheduler.Start();
  563. res.Messages.AssertEqual(
  564. OnNext(200, 1979),
  565. OnNext(340, 8)
  566. );
  567. xs.Subscriptions.AssertEqual(
  568. Subscribe(300, 400),
  569. Subscribe(500, 550),
  570. Subscribe(650, 800)
  571. );
  572. }
  573. [Fact]
  574. public void PublishWithInitialValue_MultipleConnections()
  575. {
  576. var xs = Observable.Never<int>();
  577. var ys = xs.Publish(1979);
  578. var connection1 = ys.Connect();
  579. var connection2 = ys.Connect();
  580. Assert.Same(connection1, connection2);
  581. connection1.Dispose();
  582. connection2.Dispose();
  583. var connection3 = ys.Connect();
  584. Assert.NotSame(connection1, connection3);
  585. connection3.Dispose();
  586. }
  587. [Fact]
  588. public void PublishWithInitialValueLambda_Zip_Complete()
  589. {
  590. var scheduler = new TestScheduler();
  591. var xs = scheduler.CreateHotObservable(
  592. OnNext(110, 7),
  593. OnNext(220, 3),
  594. OnNext(280, 4),
  595. OnNext(290, 1),
  596. OnNext(340, 8),
  597. OnNext(360, 5),
  598. OnNext(370, 6),
  599. OnNext(390, 7),
  600. OnNext(410, 13),
  601. OnNext(430, 2),
  602. OnNext(450, 9),
  603. OnNext(520, 11),
  604. OnNext(560, 20),
  605. OnCompleted<int>(600)
  606. );
  607. var res = scheduler.Start(() =>
  608. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev), 1979)
  609. );
  610. res.Messages.AssertEqual(
  611. OnNext(220, 1982),
  612. OnNext(280, 7),
  613. OnNext(290, 5),
  614. OnNext(340, 9),
  615. OnNext(360, 13),
  616. OnNext(370, 11),
  617. OnNext(390, 13),
  618. OnNext(410, 20),
  619. OnNext(430, 15),
  620. OnNext(450, 11),
  621. OnNext(520, 20),
  622. OnNext(560, 31),
  623. OnCompleted<int>(600)
  624. );
  625. xs.Subscriptions.AssertEqual(
  626. Subscribe(200, 600)
  627. );
  628. }
  629. [Fact]
  630. public void PublishWithInitialValueLambda_Zip_Error()
  631. {
  632. var scheduler = new TestScheduler();
  633. var ex = new Exception();
  634. var xs = scheduler.CreateHotObservable(
  635. OnNext(110, 7),
  636. OnNext(220, 3),
  637. OnNext(280, 4),
  638. OnNext(290, 1),
  639. OnNext(340, 8),
  640. OnNext(360, 5),
  641. OnNext(370, 6),
  642. OnNext(390, 7),
  643. OnNext(410, 13),
  644. OnNext(430, 2),
  645. OnNext(450, 9),
  646. OnNext(520, 11),
  647. OnNext(560, 20),
  648. OnError<int>(600, ex)
  649. );
  650. var res = scheduler.Start(() =>
  651. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev), 1979)
  652. );
  653. res.Messages.AssertEqual(
  654. OnNext(220, 1982),
  655. OnNext(280, 7),
  656. OnNext(290, 5),
  657. OnNext(340, 9),
  658. OnNext(360, 13),
  659. OnNext(370, 11),
  660. OnNext(390, 13),
  661. OnNext(410, 20),
  662. OnNext(430, 15),
  663. OnNext(450, 11),
  664. OnNext(520, 20),
  665. OnNext(560, 31),
  666. OnError<int>(600, ex)
  667. );
  668. xs.Subscriptions.AssertEqual(
  669. Subscribe(200, 600)
  670. );
  671. }
  672. [Fact]
  673. public void PublishWithInitialValueLambda_Zip_Dispose()
  674. {
  675. var scheduler = new TestScheduler();
  676. var xs = scheduler.CreateHotObservable(
  677. OnNext(110, 7),
  678. OnNext(220, 3),
  679. OnNext(280, 4),
  680. OnNext(290, 1),
  681. OnNext(340, 8),
  682. OnNext(360, 5),
  683. OnNext(370, 6),
  684. OnNext(390, 7),
  685. OnNext(410, 13),
  686. OnNext(430, 2),
  687. OnNext(450, 9),
  688. OnNext(520, 11),
  689. OnNext(560, 20),
  690. OnCompleted<int>(600)
  691. );
  692. var res = scheduler.Start(() =>
  693. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev), 1979),
  694. 470
  695. );
  696. res.Messages.AssertEqual(
  697. OnNext(220, 1982),
  698. OnNext(280, 7),
  699. OnNext(290, 5),
  700. OnNext(340, 9),
  701. OnNext(360, 13),
  702. OnNext(370, 11),
  703. OnNext(390, 13),
  704. OnNext(410, 20),
  705. OnNext(430, 15),
  706. OnNext(450, 11)
  707. );
  708. xs.Subscriptions.AssertEqual(
  709. Subscribe(200, 470)
  710. );
  711. }
  712. }
  713. }