ConcatTest.cs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.Linq;
  8. using System.Reactive;
  9. using System.Reactive.Concurrency;
  10. using System.Reactive.Linq;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using Microsoft.Reactive.Testing;
  14. using ReactiveTests.Dummies;
  15. using Microsoft.VisualStudio.TestTools.UnitTesting;
  16. using Assert = Xunit.Assert;
  17. namespace ReactiveTests.Tests
  18. {
  19. [TestClass]
  20. public class ConcatTest : ReactiveTest
  21. {
  22. [TestMethod]
  23. public void Concat_ArgumentChecking()
  24. {
  25. var xs = DummyObservable<int>.Instance;
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(xs, null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(null, xs));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IObservable<int>[])null));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IEnumerable<IObservable<int>>)null));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IObservable<int>[])null));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IEnumerable<IObservable<int>>)null));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(xs, null));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(null, xs));
  34. }
  35. [TestMethod]
  36. public void Concat_DefaultScheduler()
  37. {
  38. var evt = new ManualResetEvent(false);
  39. var sum = 0;
  40. Observable.Concat(Observable.Return(1), Observable.Return(2), Observable.Return(3)).Subscribe(n =>
  41. {
  42. sum += n;
  43. }, () => evt.Set());
  44. evt.WaitOne();
  45. Assert.Equal(6, sum);
  46. }
  47. [TestMethod]
  48. public void Concat_IEofIO_DefaultScheduler()
  49. {
  50. var evt = new ManualResetEvent(false);
  51. IEnumerable<IObservable<int>> sources = [Observable.Return(1), Observable.Return(2), Observable.Return(3)];
  52. var sum = 0;
  53. Observable.Concat(sources).Subscribe(n =>
  54. {
  55. sum += n;
  56. }, () => evt.Set());
  57. evt.WaitOne();
  58. Assert.Equal(6, sum);
  59. }
  60. [TestMethod]
  61. public void Concat_IEofIO_GetEnumeratorThrows()
  62. {
  63. var ex = new Exception();
  64. var scheduler = new TestScheduler();
  65. var xss = new RogueEnumerable<IObservable<int>>(ex);
  66. var res = scheduler.Start(() =>
  67. Observable.Concat(xss)
  68. );
  69. res.Messages.AssertEqual(
  70. OnError<int>(200, ex)
  71. );
  72. }
  73. [TestMethod]
  74. public void Concat_IEofIO()
  75. {
  76. var scheduler = new TestScheduler();
  77. var xs1 = scheduler.CreateColdObservable(
  78. OnNext(10, 1),
  79. OnNext(20, 2),
  80. OnNext(30, 3),
  81. OnCompleted<int>(40)
  82. );
  83. var xs2 = scheduler.CreateColdObservable(
  84. OnNext(10, 4),
  85. OnNext(20, 5),
  86. OnCompleted<int>(30)
  87. );
  88. var xs3 = scheduler.CreateColdObservable(
  89. OnNext(10, 6),
  90. OnNext(20, 7),
  91. OnNext(30, 8),
  92. OnNext(40, 9),
  93. OnCompleted<int>(50)
  94. );
  95. var res = scheduler.Start(() =>
  96. Observable.Concat([xs1, xs2, xs3])
  97. );
  98. res.Messages.AssertEqual(
  99. OnNext(210, 1),
  100. OnNext(220, 2),
  101. OnNext(230, 3),
  102. OnNext(250, 4),
  103. OnNext(260, 5),
  104. OnNext(280, 6),
  105. OnNext(290, 7),
  106. OnNext(300, 8),
  107. OnNext(310, 9),
  108. OnCompleted<int>(320)
  109. );
  110. xs1.Subscriptions.AssertEqual(
  111. Subscribe(200, 240)
  112. );
  113. xs2.Subscriptions.AssertEqual(
  114. Subscribe(240, 270)
  115. );
  116. xs3.Subscriptions.AssertEqual(
  117. Subscribe(270, 320)
  118. );
  119. }
  120. [TestMethod]
  121. public void Concat_EmptyEmpty()
  122. {
  123. var scheduler = new TestScheduler();
  124. var o1 = scheduler.CreateHotObservable(
  125. OnNext(150, 1),
  126. OnCompleted<int>(230)
  127. );
  128. var o2 = scheduler.CreateHotObservable(
  129. OnNext(150, 1),
  130. OnCompleted<int>(250)
  131. );
  132. var res = scheduler.Start(() =>
  133. o1.Concat(o2)
  134. );
  135. res.Messages.AssertEqual(
  136. OnCompleted<int>(250)
  137. );
  138. o1.Subscriptions.AssertEqual(
  139. Subscribe(200, 230)
  140. );
  141. o2.Subscriptions.AssertEqual(
  142. Subscribe(230, 250)
  143. );
  144. }
  145. [TestMethod]
  146. public void Concat_EmptyNever()
  147. {
  148. var scheduler = new TestScheduler();
  149. var o1 = scheduler.CreateHotObservable(
  150. OnNext(150, 1),
  151. OnCompleted<int>(230)
  152. );
  153. var o2 = scheduler.CreateHotObservable(
  154. OnNext(150, 1)
  155. );
  156. var res = scheduler.Start(() =>
  157. o1.Concat(o2)
  158. );
  159. res.Messages.AssertEqual(
  160. );
  161. o1.Subscriptions.AssertEqual(
  162. Subscribe(200, 230)
  163. );
  164. o2.Subscriptions.AssertEqual(
  165. Subscribe(230, 1000)
  166. );
  167. }
  168. [TestMethod]
  169. public void Concat_NeverEmpty()
  170. {
  171. var scheduler = new TestScheduler();
  172. var o1 = scheduler.CreateHotObservable(
  173. OnNext(150, 1)
  174. );
  175. var o2 = scheduler.CreateHotObservable(
  176. OnNext(150, 1),
  177. OnCompleted<int>(230)
  178. );
  179. var res = scheduler.Start(() =>
  180. o1.Concat(o2)
  181. );
  182. res.Messages.AssertEqual(
  183. );
  184. o1.Subscriptions.AssertEqual(
  185. Subscribe(200, 1000)
  186. );
  187. o2.Subscriptions.AssertEqual(
  188. );
  189. }
  190. [TestMethod]
  191. public void Concat_NeverNever()
  192. {
  193. var scheduler = new TestScheduler();
  194. var o1 = scheduler.CreateHotObservable(
  195. OnNext(150, 1)
  196. );
  197. var o2 = scheduler.CreateHotObservable(
  198. OnNext(150, 1)
  199. );
  200. var res = scheduler.Start(() =>
  201. o1.Concat(o2)
  202. );
  203. res.Messages.AssertEqual(
  204. );
  205. o1.Subscriptions.AssertEqual(
  206. Subscribe(200, 1000)
  207. );
  208. o2.Subscriptions.AssertEqual(
  209. );
  210. }
  211. [TestMethod]
  212. public void Concat_EmptyThrow()
  213. {
  214. var scheduler = new TestScheduler();
  215. var ex = new Exception();
  216. var o1 = scheduler.CreateHotObservable(
  217. OnNext(150, 1),
  218. OnCompleted<int>(230)
  219. );
  220. var o2 = scheduler.CreateHotObservable(
  221. OnNext(150, 1),
  222. OnError<int>(250, ex)
  223. );
  224. var res = scheduler.Start(() =>
  225. o1.Concat(o2)
  226. );
  227. res.Messages.AssertEqual(
  228. OnError<int>(250, ex)
  229. );
  230. o1.Subscriptions.AssertEqual(
  231. Subscribe(200, 230)
  232. );
  233. o2.Subscriptions.AssertEqual(
  234. Subscribe(230, 250)
  235. );
  236. }
  237. [TestMethod]
  238. public void Concat_ThrowEmpty()
  239. {
  240. var scheduler = new TestScheduler();
  241. var ex = new Exception();
  242. var o1 = scheduler.CreateHotObservable(
  243. OnNext(150, 1),
  244. OnError<int>(230, ex)
  245. );
  246. var o2 = scheduler.CreateHotObservable(
  247. OnNext(150, 1),
  248. OnCompleted<int>(250)
  249. );
  250. var res = scheduler.Start(() =>
  251. o1.Concat(o2)
  252. );
  253. res.Messages.AssertEqual(
  254. OnError<int>(230, ex)
  255. );
  256. o1.Subscriptions.AssertEqual(
  257. Subscribe(200, 230)
  258. );
  259. o2.Subscriptions.AssertEqual(
  260. );
  261. }
  262. [TestMethod]
  263. public void Concat_ThrowThrow()
  264. {
  265. var scheduler = new TestScheduler();
  266. var ex = new Exception();
  267. var o1 = scheduler.CreateHotObservable(
  268. OnNext(150, 1),
  269. OnError<int>(230, ex)
  270. );
  271. var o2 = scheduler.CreateHotObservable(
  272. OnNext(150, 1),
  273. OnError<int>(250, new Exception())
  274. );
  275. var res = scheduler.Start(() =>
  276. o1.Concat(o2)
  277. );
  278. res.Messages.AssertEqual(
  279. OnError<int>(230, ex)
  280. );
  281. o1.Subscriptions.AssertEqual(
  282. Subscribe(200, 230)
  283. );
  284. o2.Subscriptions.AssertEqual(
  285. );
  286. }
  287. [TestMethod]
  288. public void Concat_ReturnEmpty()
  289. {
  290. var scheduler = new TestScheduler();
  291. var o1 = scheduler.CreateHotObservable(
  292. OnNext(150, 1),
  293. OnNext(210, 2),
  294. OnCompleted<int>(230)
  295. );
  296. var o2 = scheduler.CreateHotObservable(
  297. OnNext(150, 1),
  298. OnCompleted<int>(250)
  299. );
  300. var res = scheduler.Start(() =>
  301. o1.Concat(o2)
  302. );
  303. res.Messages.AssertEqual(
  304. OnNext(210, 2),
  305. OnCompleted<int>(250)
  306. );
  307. o1.Subscriptions.AssertEqual(
  308. Subscribe(200, 230)
  309. );
  310. o2.Subscriptions.AssertEqual(
  311. Subscribe(230, 250)
  312. );
  313. }
  314. [TestMethod]
  315. public void Concat_EmptyReturn()
  316. {
  317. var scheduler = new TestScheduler();
  318. var o1 = scheduler.CreateHotObservable(
  319. OnNext(150, 1),
  320. OnCompleted<int>(230)
  321. );
  322. var o2 = scheduler.CreateHotObservable(
  323. OnNext(150, 1),
  324. OnNext(240, 2),
  325. OnCompleted<int>(250)
  326. );
  327. var res = scheduler.Start(() =>
  328. o1.Concat(o2)
  329. );
  330. res.Messages.AssertEqual(
  331. OnNext(240, 2),
  332. OnCompleted<int>(250)
  333. );
  334. o1.Subscriptions.AssertEqual(
  335. Subscribe(200, 230)
  336. );
  337. o2.Subscriptions.AssertEqual(
  338. Subscribe(230, 250)
  339. );
  340. }
  341. [TestMethod]
  342. public void Concat_ReturnNever()
  343. {
  344. var scheduler = new TestScheduler();
  345. var o1 = scheduler.CreateHotObservable(
  346. OnNext(150, 1),
  347. OnNext(210, 2),
  348. OnCompleted<int>(230)
  349. );
  350. var o2 = scheduler.CreateHotObservable(
  351. OnNext(150, 1)
  352. );
  353. var res = scheduler.Start(() =>
  354. o1.Concat(o2)
  355. );
  356. res.Messages.AssertEqual(
  357. OnNext(210, 2)
  358. );
  359. o1.Subscriptions.AssertEqual(
  360. Subscribe(200, 230)
  361. );
  362. o2.Subscriptions.AssertEqual(
  363. Subscribe(230, 1000)
  364. );
  365. }
  366. [TestMethod]
  367. public void Concat_NeverReturn()
  368. {
  369. var scheduler = new TestScheduler();
  370. var o1 = scheduler.CreateHotObservable(
  371. OnNext(150, 1)
  372. );
  373. var o2 = scheduler.CreateHotObservable(
  374. OnNext(150, 1),
  375. OnNext(210, 2),
  376. OnCompleted<int>(230)
  377. );
  378. var res = scheduler.Start(() =>
  379. o1.Concat(o2)
  380. );
  381. res.Messages.AssertEqual(
  382. );
  383. o1.Subscriptions.AssertEqual(
  384. Subscribe(200, 1000)
  385. );
  386. o2.Subscriptions.AssertEqual(
  387. );
  388. }
  389. [TestMethod]
  390. public void Concat_ReturnReturn()
  391. {
  392. var scheduler = new TestScheduler();
  393. var ex = new Exception();
  394. var o1 = scheduler.CreateHotObservable(
  395. OnNext(150, 1),
  396. OnNext(220, 2),
  397. OnCompleted<int>(230)
  398. );
  399. var o2 = scheduler.CreateHotObservable(
  400. OnNext(150, 1),
  401. OnNext(240, 3),
  402. OnCompleted<int>(250)
  403. );
  404. var res = scheduler.Start(() =>
  405. o1.Concat(o2)
  406. );
  407. res.Messages.AssertEqual(
  408. OnNext(220, 2),
  409. OnNext(240, 3),
  410. OnCompleted<int>(250)
  411. );
  412. o1.Subscriptions.AssertEqual(
  413. Subscribe(200, 230)
  414. );
  415. o2.Subscriptions.AssertEqual(
  416. Subscribe(230, 250)
  417. );
  418. }
  419. [TestMethod]
  420. public void Concat_ThrowReturn()
  421. {
  422. var scheduler = new TestScheduler();
  423. var ex = new Exception();
  424. var o1 = scheduler.CreateHotObservable(
  425. OnNext(150, 1),
  426. OnError<int>(230, ex)
  427. );
  428. var o2 = scheduler.CreateHotObservable(
  429. OnNext(150, 1),
  430. OnNext(240, 2),
  431. OnCompleted<int>(250)
  432. );
  433. var res = scheduler.Start(() =>
  434. o1.Concat(o2)
  435. );
  436. res.Messages.AssertEqual(
  437. OnError<int>(230, ex)
  438. );
  439. o1.Subscriptions.AssertEqual(
  440. Subscribe(200, 230)
  441. );
  442. o2.Subscriptions.AssertEqual(
  443. );
  444. }
  445. [TestMethod]
  446. public void Concat_ReturnThrow()
  447. {
  448. var scheduler = new TestScheduler();
  449. var ex = new Exception();
  450. var o1 = scheduler.CreateHotObservable(
  451. OnNext(150, 1),
  452. OnNext(220, 2),
  453. OnCompleted<int>(230)
  454. );
  455. var o2 = scheduler.CreateHotObservable(
  456. OnNext(150, 1),
  457. OnError<int>(250, ex)
  458. );
  459. var res = scheduler.Start(() =>
  460. o1.Concat(o2)
  461. );
  462. res.Messages.AssertEqual(
  463. OnNext(220, 2),
  464. OnError<int>(250, ex)
  465. );
  466. o1.Subscriptions.AssertEqual(
  467. Subscribe(200, 230)
  468. );
  469. o2.Subscriptions.AssertEqual(
  470. Subscribe(230, 250)
  471. );
  472. }
  473. [TestMethod]
  474. public void Concat_SomeDataSomeData()
  475. {
  476. var scheduler = new TestScheduler();
  477. var ex = new Exception();
  478. var o1 = scheduler.CreateHotObservable(
  479. OnNext(150, 1),
  480. OnNext(210, 2),
  481. OnNext(220, 3),
  482. OnCompleted<int>(225)
  483. );
  484. var o2 = scheduler.CreateHotObservable(
  485. OnNext(150, 1),
  486. OnNext(230, 4),
  487. OnNext(240, 5),
  488. OnCompleted<int>(250)
  489. );
  490. var res = scheduler.Start(() =>
  491. o1.Concat(o2)
  492. );
  493. res.Messages.AssertEqual(
  494. OnNext(210, 2),
  495. OnNext(220, 3),
  496. OnNext(230, 4),
  497. OnNext(240, 5),
  498. OnCompleted<int>(250)
  499. );
  500. o1.Subscriptions.AssertEqual(
  501. Subscribe(200, 225)
  502. );
  503. o2.Subscriptions.AssertEqual(
  504. Subscribe(225, 250)
  505. );
  506. }
  507. [TestMethod]
  508. public void Concat_EnumerableThrows()
  509. {
  510. var scheduler = new TestScheduler();
  511. var o = scheduler.CreateHotObservable(
  512. OnNext(150, 1),
  513. OnNext(210, 2),
  514. OnNext(220, 3),
  515. OnCompleted<int>(225)
  516. );
  517. var ex = new Exception();
  518. var xss = new MockEnumerable<IObservable<int>>(scheduler, GetObservablesForConcatThrow(o, ex));
  519. var res = scheduler.Start(() =>
  520. xss.Concat()
  521. );
  522. res.Messages.AssertEqual(
  523. OnNext(210, 2),
  524. OnNext(220, 3),
  525. OnError<int>(225, ex)
  526. );
  527. o.Subscriptions.AssertEqual(
  528. Subscribe(200, 225)
  529. );
  530. xss.Subscriptions.AssertEqual(
  531. Subscribe(200, 225)
  532. );
  533. }
  534. private IEnumerable<IObservable<int>> GetObservablesForConcatThrow(IObservable<int> first, Exception ex)
  535. {
  536. yield return first;
  537. throw ex;
  538. }
  539. [TestMethod]
  540. public void Concat_EnumerableTiming()
  541. {
  542. var scheduler = new TestScheduler();
  543. var o1 = scheduler.CreateHotObservable(
  544. OnNext(150, 1),
  545. OnNext(210, 2), // !
  546. OnNext(220, 3), // !
  547. OnCompleted<int>(230)
  548. );
  549. var o2 = scheduler.CreateColdObservable(
  550. OnNext(50, 4), // !
  551. OnNext(60, 5), // !
  552. OnNext(70, 6), // !
  553. OnCompleted<int>(80)
  554. );
  555. var o3 = scheduler.CreateHotObservable(
  556. OnNext(150, 1),
  557. OnNext(200, 2),
  558. OnNext(210, 3),
  559. OnNext(220, 4),
  560. OnNext(230, 5),
  561. OnNext(270, 6),
  562. OnNext(320, 7), // !
  563. OnNext(330, 8), // !
  564. OnCompleted<int>(340)
  565. );
  566. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, [o1, o2, o3, o2]);
  567. var res = scheduler.Start(() =>
  568. xss.Select(xs => (IObservable<int>)xs).Concat()
  569. );
  570. res.Messages.AssertEqual(
  571. OnNext(210, 2),
  572. OnNext(220, 3),
  573. OnNext(280, 4),
  574. OnNext(290, 5),
  575. OnNext(300, 6),
  576. OnNext(320, 7),
  577. OnNext(330, 8),
  578. OnNext(390, 4),
  579. OnNext(400, 5),
  580. OnNext(410, 6),
  581. OnCompleted<int>(420)
  582. );
  583. o1.Subscriptions.AssertEqual(
  584. Subscribe(200, 230)
  585. );
  586. o2.Subscriptions.AssertEqual(
  587. Subscribe(230, 310),
  588. Subscribe(340, 420)
  589. );
  590. o3.Subscriptions.AssertEqual(
  591. Subscribe(310, 340)
  592. );
  593. xss.Subscriptions.AssertEqual(
  594. Subscribe(200, 420)
  595. );
  596. }
  597. [TestMethod]
  598. public void Concat_Enumerable_Dispose()
  599. {
  600. var scheduler = new TestScheduler();
  601. var o1 = scheduler.CreateHotObservable(
  602. OnNext(150, 1),
  603. OnNext(210, 2),
  604. OnNext(220, 3),
  605. OnCompleted<int>(230)
  606. );
  607. var o2 = scheduler.CreateHotObservable(
  608. OnNext(150, 1),
  609. OnNext(200, 2),
  610. OnNext(210, 3),
  611. OnNext(240, 4),
  612. OnNext(270, 5),
  613. OnNext(320, 6),
  614. OnNext(330, 7),
  615. OnCompleted<int>(340)
  616. );
  617. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, [o1, o2]);
  618. var res = scheduler.Start(() =>
  619. xss.Select(xs => (IObservable<int>)xs).Concat(),
  620. 300
  621. );
  622. res.Messages.AssertEqual(
  623. OnNext(210, 2),
  624. OnNext(220, 3),
  625. OnNext(240, 4),
  626. OnNext(270, 5)
  627. );
  628. o1.Subscriptions.AssertEqual(
  629. Subscribe(200, 230)
  630. );
  631. o2.Subscriptions.AssertEqual(
  632. Subscribe(230, 300)
  633. );
  634. xss.Subscriptions.AssertEqual(
  635. Subscribe(200, 300)
  636. );
  637. }
  638. [TestMethod]
  639. public void Concat_Optimization_DeferEvalTiming()
  640. {
  641. var scheduler = new TestScheduler();
  642. var o1 = scheduler.CreateHotObservable(
  643. OnNext(150, 1),
  644. OnNext(210, 2),
  645. OnNext(220, 3),
  646. OnCompleted<int>(230)
  647. );
  648. var o2 = scheduler.CreateColdObservable(
  649. OnNext(10, 4),
  650. OnNext(20, 5),
  651. OnNext(30, 6),
  652. OnCompleted<int>(40)
  653. );
  654. var invoked = default(long);
  655. var xs = o1;
  656. var ys = Observable.Defer(() => { invoked = scheduler.Clock; return o2; });
  657. var res = scheduler.Start(() =>
  658. xs.Concat(ys)
  659. );
  660. res.Messages.AssertEqual(
  661. OnNext(210, 2),
  662. OnNext(220, 3),
  663. OnNext(240, 4),
  664. OnNext(250, 5),
  665. OnNext(260, 6),
  666. OnCompleted<int>(270)
  667. );
  668. o1.Subscriptions.AssertEqual(
  669. Subscribe(200, 230)
  670. );
  671. o2.Subscriptions.AssertEqual(
  672. Subscribe(230, 270)
  673. );
  674. Assert.Equal(230, invoked);
  675. }
  676. [TestMethod]
  677. public void Concat_Optimization_DeferExceptionPropagation()
  678. {
  679. var scheduler = new TestScheduler();
  680. var o1 = scheduler.CreateHotObservable(
  681. OnNext(150, 1),
  682. OnNext(210, 2),
  683. OnCompleted<int>(220)
  684. );
  685. var ex = new Exception();
  686. var invoked = default(long);
  687. var xs = o1;
  688. var ys = Observable.Defer(new Func<IObservable<int>>(() => { invoked = scheduler.Clock; throw ex; }));
  689. var res = scheduler.Start(() =>
  690. xs.Concat(ys)
  691. );
  692. res.Messages.AssertEqual(
  693. OnNext(210, 2),
  694. OnError<int>(220, ex)
  695. );
  696. o1.Subscriptions.AssertEqual(
  697. Subscribe(200, 220)
  698. );
  699. Assert.Equal(220, invoked);
  700. }
  701. #if !NO_PERF
  702. [TestMethod]
  703. public void Concat_TailRecursive1()
  704. {
  705. var create = 0L;
  706. var start = 200L;
  707. var end = 1000L;
  708. var scheduler = new TestScheduler();
  709. var o = scheduler.CreateColdObservable(
  710. OnNext(10, 1),
  711. OnNext(20, 2),
  712. OnNext(30, 3),
  713. OnCompleted<int>(40)
  714. );
  715. IObservable<int> f() => Observable.Defer(() => o.Concat(f()));
  716. var res = scheduler.Start(() => f(), create, start, end);
  717. var expected = new List<Recorded<Notification<int>>>();
  718. var t = start;
  719. while (t <= end)
  720. {
  721. var n = (t - start) / 10;
  722. if (n % 4 != 0)
  723. {
  724. expected.Add(OnNext(t, (int)(n % 4)));
  725. }
  726. t += 10;
  727. }
  728. res.Messages.AssertEqual(expected);
  729. }
  730. [TestMethod]
  731. public void Concat_TailRecursive2()
  732. {
  733. IObservable<int> f(int x) => Observable.Defer(() => Observable.Return(x, ThreadPoolScheduler.Instance).Concat(f(x + 1)));
  734. var lst = new List<int>();
  735. f(0).Select(x => new StackTrace().FrameCount).Take(10).ForEach(lst.Add);
  736. Assert.True(lst.Last() - lst.First() < 10);
  737. }
  738. #endif
  739. [TestMethod]
  740. public void Concat_Task()
  741. {
  742. var tss = Observable.Concat(new[] { Task.Factory.StartNew(() => 1), Task.Factory.StartNew(() => 2), Task.Factory.StartNew(() => 3) }.ToObservable());
  743. var res = tss.ToArray().Single();
  744. Assert.True(res.SequenceEqual([1, 2, 3]));
  745. }
  746. }
  747. }