ConcatTest.cs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. using System.Diagnostics;
  20. namespace ReactiveTests.Tests
  21. {
  22. public class ConcatTest : ReactiveTest
  23. {
  24. [Fact]
  25. public void Concat_ArgumentChecking()
  26. {
  27. var xs = DummyObservable<int>.Instance;
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(xs, (IObservable<int>)null));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IObservable<int>)null, xs));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IObservable<int>[])null));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IEnumerable<IObservable<int>>)null));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IObservable<int>[])null));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IEnumerable<IObservable<int>>)null));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(xs, null));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(null, xs));
  36. }
  37. [Fact]
  38. public void Concat_DefaultScheduler()
  39. {
  40. var evt = new ManualResetEvent(false);
  41. int sum = 0;
  42. Observable.Concat(Observable.Return(1), Observable.Return(2), Observable.Return(3)).Subscribe(n =>
  43. {
  44. sum += n;
  45. }, () => evt.Set());
  46. evt.WaitOne();
  47. Assert.Equal(6, sum);
  48. }
  49. [Fact]
  50. public void Concat_IEofIO_DefaultScheduler()
  51. {
  52. var evt = new ManualResetEvent(false);
  53. IEnumerable<IObservable<int>> sources = new[] { Observable.Return(1), Observable.Return(2), Observable.Return(3) };
  54. int sum = 0;
  55. Observable.Concat(sources).Subscribe(n =>
  56. {
  57. sum += n;
  58. }, () => evt.Set());
  59. evt.WaitOne();
  60. Assert.Equal(6, sum);
  61. }
  62. [Fact]
  63. public void Concat_IEofIO_GetEnumeratorThrows()
  64. {
  65. var ex = new Exception();
  66. var scheduler = new TestScheduler();
  67. var xss = new RogueEnumerable<IObservable<int>>(ex);
  68. var res = scheduler.Start(() =>
  69. Observable.Concat(xss)
  70. );
  71. res.Messages.AssertEqual(
  72. OnError<int>(200, ex)
  73. );
  74. }
  75. [Fact]
  76. public void Concat_IEofIO()
  77. {
  78. var scheduler = new TestScheduler();
  79. var xs1 = scheduler.CreateColdObservable<int>(
  80. OnNext(10, 1),
  81. OnNext(20, 2),
  82. OnNext(30, 3),
  83. OnCompleted<int>(40)
  84. );
  85. var xs2 = scheduler.CreateColdObservable<int>(
  86. OnNext(10, 4),
  87. OnNext(20, 5),
  88. OnCompleted<int>(30)
  89. );
  90. var xs3 = scheduler.CreateColdObservable<int>(
  91. OnNext(10, 6),
  92. OnNext(20, 7),
  93. OnNext(30, 8),
  94. OnNext(40, 9),
  95. OnCompleted<int>(50)
  96. );
  97. var res = scheduler.Start(() =>
  98. Observable.Concat(new[] { xs1, xs2, xs3 })
  99. );
  100. res.Messages.AssertEqual(
  101. OnNext(210, 1),
  102. OnNext(220, 2),
  103. OnNext(230, 3),
  104. OnNext(250, 4),
  105. OnNext(260, 5),
  106. OnNext(280, 6),
  107. OnNext(290, 7),
  108. OnNext(300, 8),
  109. OnNext(310, 9),
  110. OnCompleted<int>(320)
  111. );
  112. xs1.Subscriptions.AssertEqual(
  113. Subscribe(200, 240)
  114. );
  115. xs2.Subscriptions.AssertEqual(
  116. Subscribe(240, 270)
  117. );
  118. xs3.Subscriptions.AssertEqual(
  119. Subscribe(270, 320)
  120. );
  121. }
  122. [Fact]
  123. public void Concat_EmptyEmpty()
  124. {
  125. var scheduler = new TestScheduler();
  126. var o1 = scheduler.CreateHotObservable(
  127. OnNext(150, 1),
  128. OnCompleted<int>(230)
  129. );
  130. var o2 = scheduler.CreateHotObservable(
  131. OnNext(150, 1),
  132. OnCompleted<int>(250)
  133. );
  134. var res = scheduler.Start(() =>
  135. o1.Concat(o2)
  136. );
  137. res.Messages.AssertEqual(
  138. OnCompleted<int>(250)
  139. );
  140. o1.Subscriptions.AssertEqual(
  141. Subscribe(200, 230)
  142. );
  143. o2.Subscriptions.AssertEqual(
  144. Subscribe(230, 250)
  145. );
  146. }
  147. [Fact]
  148. public void Concat_EmptyNever()
  149. {
  150. var scheduler = new TestScheduler();
  151. var o1 = scheduler.CreateHotObservable(
  152. OnNext(150, 1),
  153. OnCompleted<int>(230)
  154. );
  155. var o2 = scheduler.CreateHotObservable(
  156. OnNext(150, 1)
  157. );
  158. var res = scheduler.Start(() =>
  159. o1.Concat(o2)
  160. );
  161. res.Messages.AssertEqual(
  162. );
  163. o1.Subscriptions.AssertEqual(
  164. Subscribe(200, 230)
  165. );
  166. o2.Subscriptions.AssertEqual(
  167. Subscribe(230, 1000)
  168. );
  169. }
  170. [Fact]
  171. public void Concat_NeverEmpty()
  172. {
  173. var scheduler = new TestScheduler();
  174. var o1 = scheduler.CreateHotObservable(
  175. OnNext(150, 1)
  176. );
  177. var o2 = scheduler.CreateHotObservable(
  178. OnNext(150, 1),
  179. OnCompleted<int>(230)
  180. );
  181. var res = scheduler.Start(() =>
  182. o1.Concat(o2)
  183. );
  184. res.Messages.AssertEqual(
  185. );
  186. o1.Subscriptions.AssertEqual(
  187. Subscribe(200, 1000)
  188. );
  189. o2.Subscriptions.AssertEqual(
  190. );
  191. }
  192. [Fact]
  193. public void Concat_NeverNever()
  194. {
  195. var scheduler = new TestScheduler();
  196. var o1 = scheduler.CreateHotObservable(
  197. OnNext(150, 1)
  198. );
  199. var o2 = scheduler.CreateHotObservable(
  200. OnNext(150, 1)
  201. );
  202. var res = scheduler.Start(() =>
  203. o1.Concat(o2)
  204. );
  205. res.Messages.AssertEqual(
  206. );
  207. o1.Subscriptions.AssertEqual(
  208. Subscribe(200, 1000)
  209. );
  210. o2.Subscriptions.AssertEqual(
  211. );
  212. }
  213. [Fact]
  214. public void Concat_EmptyThrow()
  215. {
  216. var scheduler = new TestScheduler();
  217. var ex = new Exception();
  218. var o1 = scheduler.CreateHotObservable(
  219. OnNext(150, 1),
  220. OnCompleted<int>(230)
  221. );
  222. var o2 = scheduler.CreateHotObservable(
  223. OnNext(150, 1),
  224. OnError<int>(250, ex)
  225. );
  226. var res = scheduler.Start(() =>
  227. o1.Concat(o2)
  228. );
  229. res.Messages.AssertEqual(
  230. OnError<int>(250, ex)
  231. );
  232. o1.Subscriptions.AssertEqual(
  233. Subscribe(200, 230)
  234. );
  235. o2.Subscriptions.AssertEqual(
  236. Subscribe(230, 250)
  237. );
  238. }
  239. [Fact]
  240. public void Concat_ThrowEmpty()
  241. {
  242. var scheduler = new TestScheduler();
  243. var ex = new Exception();
  244. var o1 = scheduler.CreateHotObservable(
  245. OnNext(150, 1),
  246. OnError<int>(230, ex)
  247. );
  248. var o2 = scheduler.CreateHotObservable(
  249. OnNext(150, 1),
  250. OnCompleted<int>(250)
  251. );
  252. var res = scheduler.Start(() =>
  253. o1.Concat(o2)
  254. );
  255. res.Messages.AssertEqual(
  256. OnError<int>(230, ex)
  257. );
  258. o1.Subscriptions.AssertEqual(
  259. Subscribe(200, 230)
  260. );
  261. o2.Subscriptions.AssertEqual(
  262. );
  263. }
  264. [Fact]
  265. public void Concat_ThrowThrow()
  266. {
  267. var scheduler = new TestScheduler();
  268. var ex = new Exception();
  269. var o1 = scheduler.CreateHotObservable(
  270. OnNext(150, 1),
  271. OnError<int>(230, ex)
  272. );
  273. var o2 = scheduler.CreateHotObservable(
  274. OnNext(150, 1),
  275. OnError<int>(250, new Exception())
  276. );
  277. var res = scheduler.Start(() =>
  278. o1.Concat(o2)
  279. );
  280. res.Messages.AssertEqual(
  281. OnError<int>(230, ex)
  282. );
  283. o1.Subscriptions.AssertEqual(
  284. Subscribe(200, 230)
  285. );
  286. o2.Subscriptions.AssertEqual(
  287. );
  288. }
  289. [Fact]
  290. public void Concat_ReturnEmpty()
  291. {
  292. var scheduler = new TestScheduler();
  293. var o1 = scheduler.CreateHotObservable(
  294. OnNext(150, 1),
  295. OnNext(210, 2),
  296. OnCompleted<int>(230)
  297. );
  298. var o2 = scheduler.CreateHotObservable(
  299. OnNext(150, 1),
  300. OnCompleted<int>(250)
  301. );
  302. var res = scheduler.Start(() =>
  303. o1.Concat(o2)
  304. );
  305. res.Messages.AssertEqual(
  306. OnNext(210, 2),
  307. OnCompleted<int>(250)
  308. );
  309. o1.Subscriptions.AssertEqual(
  310. Subscribe(200, 230)
  311. );
  312. o2.Subscriptions.AssertEqual(
  313. Subscribe(230, 250)
  314. );
  315. }
  316. [Fact]
  317. public void Concat_EmptyReturn()
  318. {
  319. var scheduler = new TestScheduler();
  320. var o1 = scheduler.CreateHotObservable(
  321. OnNext(150, 1),
  322. OnCompleted<int>(230)
  323. );
  324. var o2 = scheduler.CreateHotObservable(
  325. OnNext(150, 1),
  326. OnNext(240, 2),
  327. OnCompleted<int>(250)
  328. );
  329. var res = scheduler.Start(() =>
  330. o1.Concat(o2)
  331. );
  332. res.Messages.AssertEqual(
  333. OnNext(240, 2),
  334. OnCompleted<int>(250)
  335. );
  336. o1.Subscriptions.AssertEqual(
  337. Subscribe(200, 230)
  338. );
  339. o2.Subscriptions.AssertEqual(
  340. Subscribe(230, 250)
  341. );
  342. }
  343. [Fact]
  344. public void Concat_ReturnNever()
  345. {
  346. var scheduler = new TestScheduler();
  347. var o1 = scheduler.CreateHotObservable(
  348. OnNext(150, 1),
  349. OnNext(210, 2),
  350. OnCompleted<int>(230)
  351. );
  352. var o2 = scheduler.CreateHotObservable(
  353. OnNext(150, 1)
  354. );
  355. var res = scheduler.Start(() =>
  356. o1.Concat(o2)
  357. );
  358. res.Messages.AssertEqual(
  359. OnNext(210, 2)
  360. );
  361. o1.Subscriptions.AssertEqual(
  362. Subscribe(200, 230)
  363. );
  364. o2.Subscriptions.AssertEqual(
  365. Subscribe(230, 1000)
  366. );
  367. }
  368. [Fact]
  369. public void Concat_NeverReturn()
  370. {
  371. var scheduler = new TestScheduler();
  372. var o1 = scheduler.CreateHotObservable(
  373. OnNext(150, 1)
  374. );
  375. var o2 = scheduler.CreateHotObservable(
  376. OnNext(150, 1),
  377. OnNext(210, 2),
  378. OnCompleted<int>(230)
  379. );
  380. var res = scheduler.Start(() =>
  381. o1.Concat(o2)
  382. );
  383. res.Messages.AssertEqual(
  384. );
  385. o1.Subscriptions.AssertEqual(
  386. Subscribe(200, 1000)
  387. );
  388. o2.Subscriptions.AssertEqual(
  389. );
  390. }
  391. [Fact]
  392. public void Concat_ReturnReturn()
  393. {
  394. var scheduler = new TestScheduler();
  395. var ex = new Exception();
  396. var o1 = scheduler.CreateHotObservable(
  397. OnNext(150, 1),
  398. OnNext(220, 2),
  399. OnCompleted<int>(230)
  400. );
  401. var o2 = scheduler.CreateHotObservable(
  402. OnNext(150, 1),
  403. OnNext(240, 3),
  404. OnCompleted<int>(250)
  405. );
  406. var res = scheduler.Start(() =>
  407. o1.Concat(o2)
  408. );
  409. res.Messages.AssertEqual(
  410. OnNext(220, 2),
  411. OnNext(240, 3),
  412. OnCompleted<int>(250)
  413. );
  414. o1.Subscriptions.AssertEqual(
  415. Subscribe(200, 230)
  416. );
  417. o2.Subscriptions.AssertEqual(
  418. Subscribe(230, 250)
  419. );
  420. }
  421. [Fact]
  422. public void Concat_ThrowReturn()
  423. {
  424. var scheduler = new TestScheduler();
  425. var ex = new Exception();
  426. var o1 = scheduler.CreateHotObservable(
  427. OnNext(150, 1),
  428. OnError<int>(230, ex)
  429. );
  430. var o2 = scheduler.CreateHotObservable(
  431. OnNext(150, 1),
  432. OnNext(240, 2),
  433. OnCompleted<int>(250)
  434. );
  435. var res = scheduler.Start(() =>
  436. o1.Concat(o2)
  437. );
  438. res.Messages.AssertEqual(
  439. OnError<int>(230, ex)
  440. );
  441. o1.Subscriptions.AssertEqual(
  442. Subscribe(200, 230)
  443. );
  444. o2.Subscriptions.AssertEqual(
  445. );
  446. }
  447. [Fact]
  448. public void Concat_ReturnThrow()
  449. {
  450. var scheduler = new TestScheduler();
  451. var ex = new Exception();
  452. var o1 = scheduler.CreateHotObservable(
  453. OnNext(150, 1),
  454. OnNext(220, 2),
  455. OnCompleted<int>(230)
  456. );
  457. var o2 = scheduler.CreateHotObservable(
  458. OnNext(150, 1),
  459. OnError<int>(250, ex)
  460. );
  461. var res = scheduler.Start(() =>
  462. o1.Concat(o2)
  463. );
  464. res.Messages.AssertEqual(
  465. OnNext(220, 2),
  466. OnError<int>(250, ex)
  467. );
  468. o1.Subscriptions.AssertEqual(
  469. Subscribe(200, 230)
  470. );
  471. o2.Subscriptions.AssertEqual(
  472. Subscribe(230, 250)
  473. );
  474. }
  475. [Fact]
  476. public void Concat_SomeDataSomeData()
  477. {
  478. var scheduler = new TestScheduler();
  479. var ex = new Exception();
  480. var o1 = scheduler.CreateHotObservable(
  481. OnNext(150, 1),
  482. OnNext(210, 2),
  483. OnNext(220, 3),
  484. OnCompleted<int>(225)
  485. );
  486. var o2 = scheduler.CreateHotObservable(
  487. OnNext(150, 1),
  488. OnNext(230, 4),
  489. OnNext(240, 5),
  490. OnCompleted<int>(250)
  491. );
  492. var res = scheduler.Start(() =>
  493. o1.Concat(o2)
  494. );
  495. res.Messages.AssertEqual(
  496. OnNext(210, 2),
  497. OnNext(220, 3),
  498. OnNext(230, 4),
  499. OnNext(240, 5),
  500. OnCompleted<int>(250)
  501. );
  502. o1.Subscriptions.AssertEqual(
  503. Subscribe(200, 225)
  504. );
  505. o2.Subscriptions.AssertEqual(
  506. Subscribe(225, 250)
  507. );
  508. }
  509. [Fact]
  510. public void Concat_EnumerableThrows()
  511. {
  512. var scheduler = new TestScheduler();
  513. var o = scheduler.CreateHotObservable(
  514. OnNext(150, 1),
  515. OnNext(210, 2),
  516. OnNext(220, 3),
  517. OnCompleted<int>(225)
  518. );
  519. var ex = new Exception();
  520. var xss = new MockEnumerable<IObservable<int>>(scheduler, GetObservablesForConcatThrow(o, ex));
  521. var res = scheduler.Start(() =>
  522. xss.Concat()
  523. );
  524. res.Messages.AssertEqual(
  525. OnNext(210, 2),
  526. OnNext(220, 3),
  527. OnError<int>(225, ex)
  528. );
  529. o.Subscriptions.AssertEqual(
  530. Subscribe(200, 225)
  531. );
  532. xss.Subscriptions.AssertEqual(
  533. Subscribe(200, 225)
  534. );
  535. }
  536. private IEnumerable<IObservable<int>> GetObservablesForConcatThrow(IObservable<int> first, Exception ex)
  537. {
  538. yield return first;
  539. throw ex;
  540. }
  541. [Fact]
  542. public void Concat_EnumerableTiming()
  543. {
  544. var scheduler = new TestScheduler();
  545. var o1 = scheduler.CreateHotObservable(
  546. OnNext(150, 1),
  547. OnNext(210, 2), // !
  548. OnNext(220, 3), // !
  549. OnCompleted<int>(230)
  550. );
  551. var o2 = scheduler.CreateColdObservable(
  552. OnNext(50, 4), // !
  553. OnNext(60, 5), // !
  554. OnNext(70, 6), // !
  555. OnCompleted<int>(80)
  556. );
  557. var o3 = scheduler.CreateHotObservable(
  558. OnNext(150, 1),
  559. OnNext(200, 2),
  560. OnNext(210, 3),
  561. OnNext(220, 4),
  562. OnNext(230, 5),
  563. OnNext(270, 6),
  564. OnNext(320, 7), // !
  565. OnNext(330, 8), // !
  566. OnCompleted<int>(340)
  567. );
  568. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, new[] { o1, o2, o3, o2 });
  569. var res = scheduler.Start(() =>
  570. xss.Select(xs => (IObservable<int>)xs).Concat()
  571. );
  572. res.Messages.AssertEqual(
  573. OnNext(210, 2),
  574. OnNext(220, 3),
  575. OnNext(280, 4),
  576. OnNext(290, 5),
  577. OnNext(300, 6),
  578. OnNext(320, 7),
  579. OnNext(330, 8),
  580. OnNext(390, 4),
  581. OnNext(400, 5),
  582. OnNext(410, 6),
  583. OnCompleted<int>(420)
  584. );
  585. o1.Subscriptions.AssertEqual(
  586. Subscribe(200, 230)
  587. );
  588. o2.Subscriptions.AssertEqual(
  589. Subscribe(230, 310),
  590. Subscribe(340, 420)
  591. );
  592. o3.Subscriptions.AssertEqual(
  593. Subscribe(310, 340)
  594. );
  595. xss.Subscriptions.AssertEqual(
  596. Subscribe(200, 420)
  597. );
  598. }
  599. [Fact]
  600. public void Concat_Enumerable_Dispose()
  601. {
  602. var scheduler = new TestScheduler();
  603. var o1 = scheduler.CreateHotObservable(
  604. OnNext(150, 1),
  605. OnNext(210, 2),
  606. OnNext(220, 3),
  607. OnCompleted<int>(230)
  608. );
  609. var o2 = scheduler.CreateHotObservable(
  610. OnNext(150, 1),
  611. OnNext(200, 2),
  612. OnNext(210, 3),
  613. OnNext(240, 4),
  614. OnNext(270, 5),
  615. OnNext(320, 6),
  616. OnNext(330, 7),
  617. OnCompleted<int>(340)
  618. );
  619. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, new[] { o1, o2 });
  620. var res = scheduler.Start(() =>
  621. xss.Select(xs => (IObservable<int>)xs).Concat(),
  622. 300
  623. );
  624. res.Messages.AssertEqual(
  625. OnNext(210, 2),
  626. OnNext(220, 3),
  627. OnNext(240, 4),
  628. OnNext(270, 5)
  629. );
  630. o1.Subscriptions.AssertEqual(
  631. Subscribe(200, 230)
  632. );
  633. o2.Subscriptions.AssertEqual(
  634. Subscribe(230, 300)
  635. );
  636. xss.Subscriptions.AssertEqual(
  637. Subscribe(200, 300)
  638. );
  639. }
  640. [Fact]
  641. public void Concat_Optimization_DeferEvalTiming()
  642. {
  643. var scheduler = new TestScheduler();
  644. var o1 = scheduler.CreateHotObservable(
  645. OnNext(150, 1),
  646. OnNext(210, 2),
  647. OnNext(220, 3),
  648. OnCompleted<int>(230)
  649. );
  650. var o2 = scheduler.CreateColdObservable(
  651. OnNext(10, 4),
  652. OnNext(20, 5),
  653. OnNext(30, 6),
  654. OnCompleted<int>(40)
  655. );
  656. var invoked = default(long);
  657. var xs = o1;
  658. var ys = Observable.Defer(() => { invoked = scheduler.Clock; return o2; });
  659. var res = scheduler.Start(() =>
  660. xs.Concat(ys)
  661. );
  662. res.Messages.AssertEqual(
  663. OnNext(210, 2),
  664. OnNext(220, 3),
  665. OnNext(240, 4),
  666. OnNext(250, 5),
  667. OnNext(260, 6),
  668. OnCompleted<int>(270)
  669. );
  670. o1.Subscriptions.AssertEqual(
  671. Subscribe(200, 230)
  672. );
  673. o2.Subscriptions.AssertEqual(
  674. Subscribe(230, 270)
  675. );
  676. Assert.Equal(230, invoked);
  677. }
  678. [Fact]
  679. public void Concat_Optimization_DeferExceptionPropagation()
  680. {
  681. var scheduler = new TestScheduler();
  682. var o1 = scheduler.CreateHotObservable(
  683. OnNext(150, 1),
  684. OnNext(210, 2),
  685. OnCompleted<int>(220)
  686. );
  687. var ex = new Exception();
  688. var invoked = default(long);
  689. var xs = o1;
  690. var ys = Observable.Defer<int>(new Func<IObservable<int>>(() => { invoked = scheduler.Clock; throw ex; }));
  691. var res = scheduler.Start(() =>
  692. xs.Concat(ys)
  693. );
  694. res.Messages.AssertEqual(
  695. OnNext(210, 2),
  696. OnError<int>(220, ex)
  697. );
  698. o1.Subscriptions.AssertEqual(
  699. Subscribe(200, 220)
  700. );
  701. Assert.Equal(220, invoked);
  702. }
  703. #if !NO_PERF
  704. [Fact]
  705. public void Concat_TailRecursive1()
  706. {
  707. var create = 0L;
  708. var start = 200L;
  709. var end = 1000L;
  710. var scheduler = new TestScheduler();
  711. var o = scheduler.CreateColdObservable<int>(
  712. OnNext(10, 1),
  713. OnNext(20, 2),
  714. OnNext(30, 3),
  715. OnCompleted<int>(40)
  716. );
  717. var f = default(Func<IObservable<int>>);
  718. f = () => Observable.Defer(() => o.Concat(f()));
  719. var res = scheduler.Start(() => f(), create, start, end);
  720. var expected = new List<Recorded<Notification<int>>>();
  721. var t = start;
  722. while (t <= end)
  723. {
  724. var n = (t - start) / 10;
  725. if (n % 4 != 0)
  726. {
  727. expected.Add(OnNext(t, (int)(n % 4)));
  728. }
  729. t += 10;
  730. }
  731. res.Messages.AssertEqual(expected);
  732. }
  733. #if !NO_THREAD && !NETCOREAPP1_1 && !NETCOREAPP1_0
  734. [Fact]
  735. public void Concat_TailRecursive2()
  736. {
  737. var f = default(Func<int, IObservable<int>>);
  738. f = x => Observable.Defer(() => Observable.Return(x, ThreadPoolScheduler.Instance).Concat(f(x + 1)));
  739. var lst = new List<int>();
  740. f(0).Select(x => new StackTrace().FrameCount).Take(10).ForEach(lst.Add);
  741. Assert.True(lst.Last() - lst.First() < 10);
  742. }
  743. #endif
  744. #endif
  745. [Fact]
  746. public void Concat_Task()
  747. {
  748. var tss = Observable.Concat(new[] { Task.Factory.StartNew(() => 1), Task.Factory.StartNew(() => 2), Task.Factory.StartNew(() => 3) }.ToObservable());
  749. var res = tss.ToArray().Single();
  750. Assert.True(res.SequenceEqual(new[] { 1, 2, 3 }));
  751. }
  752. }
  753. }