ConcatTest.cs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.Linq;
  8. using System.Reactive;
  9. using System.Reactive.Concurrency;
  10. using System.Reactive.Linq;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using Microsoft.Reactive.Testing;
  14. using ReactiveTests.Dummies;
  15. using Xunit;
  16. namespace ReactiveTests.Tests
  17. {
  18. public class ConcatTest : ReactiveTest
  19. {
  20. [Fact]
  21. public void Concat_ArgumentChecking()
  22. {
  23. var xs = DummyObservable<int>.Instance;
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(xs, null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(null, xs));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IObservable<int>[])null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IEnumerable<IObservable<int>>)null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IObservable<int>[])null));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat((IEnumerable<IObservable<int>>)null));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(xs, null));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(null, xs));
  32. }
  33. [Fact]
  34. public void Concat_DefaultScheduler()
  35. {
  36. var evt = new ManualResetEvent(false);
  37. var sum = 0;
  38. Observable.Concat(Observable.Return(1), Observable.Return(2), Observable.Return(3)).Subscribe(n =>
  39. {
  40. sum += n;
  41. }, () => evt.Set());
  42. evt.WaitOne();
  43. Assert.Equal(6, sum);
  44. }
  45. [Fact]
  46. public void Concat_IEofIO_DefaultScheduler()
  47. {
  48. var evt = new ManualResetEvent(false);
  49. IEnumerable<IObservable<int>> sources = new[] { Observable.Return(1), Observable.Return(2), Observable.Return(3) };
  50. var sum = 0;
  51. Observable.Concat(sources).Subscribe(n =>
  52. {
  53. sum += n;
  54. }, () => evt.Set());
  55. evt.WaitOne();
  56. Assert.Equal(6, sum);
  57. }
  58. [Fact]
  59. public void Concat_IEofIO_GetEnumeratorThrows()
  60. {
  61. var ex = new Exception();
  62. var scheduler = new TestScheduler();
  63. var xss = new RogueEnumerable<IObservable<int>>(ex);
  64. var res = scheduler.Start(() =>
  65. Observable.Concat(xss)
  66. );
  67. res.Messages.AssertEqual(
  68. OnError<int>(200, ex)
  69. );
  70. }
  71. [Fact]
  72. public void Concat_IEofIO()
  73. {
  74. var scheduler = new TestScheduler();
  75. var xs1 = scheduler.CreateColdObservable(
  76. OnNext(10, 1),
  77. OnNext(20, 2),
  78. OnNext(30, 3),
  79. OnCompleted<int>(40)
  80. );
  81. var xs2 = scheduler.CreateColdObservable(
  82. OnNext(10, 4),
  83. OnNext(20, 5),
  84. OnCompleted<int>(30)
  85. );
  86. var xs3 = scheduler.CreateColdObservable(
  87. OnNext(10, 6),
  88. OnNext(20, 7),
  89. OnNext(30, 8),
  90. OnNext(40, 9),
  91. OnCompleted<int>(50)
  92. );
  93. var res = scheduler.Start(() =>
  94. Observable.Concat(new[] { xs1, xs2, xs3 })
  95. );
  96. res.Messages.AssertEqual(
  97. OnNext(210, 1),
  98. OnNext(220, 2),
  99. OnNext(230, 3),
  100. OnNext(250, 4),
  101. OnNext(260, 5),
  102. OnNext(280, 6),
  103. OnNext(290, 7),
  104. OnNext(300, 8),
  105. OnNext(310, 9),
  106. OnCompleted<int>(320)
  107. );
  108. xs1.Subscriptions.AssertEqual(
  109. Subscribe(200, 240)
  110. );
  111. xs2.Subscriptions.AssertEqual(
  112. Subscribe(240, 270)
  113. );
  114. xs3.Subscriptions.AssertEqual(
  115. Subscribe(270, 320)
  116. );
  117. }
  118. [Fact]
  119. public void Concat_EmptyEmpty()
  120. {
  121. var scheduler = new TestScheduler();
  122. var o1 = scheduler.CreateHotObservable(
  123. OnNext(150, 1),
  124. OnCompleted<int>(230)
  125. );
  126. var o2 = scheduler.CreateHotObservable(
  127. OnNext(150, 1),
  128. OnCompleted<int>(250)
  129. );
  130. var res = scheduler.Start(() =>
  131. o1.Concat(o2)
  132. );
  133. res.Messages.AssertEqual(
  134. OnCompleted<int>(250)
  135. );
  136. o1.Subscriptions.AssertEqual(
  137. Subscribe(200, 230)
  138. );
  139. o2.Subscriptions.AssertEqual(
  140. Subscribe(230, 250)
  141. );
  142. }
  143. [Fact]
  144. public void Concat_EmptyNever()
  145. {
  146. var scheduler = new TestScheduler();
  147. var o1 = scheduler.CreateHotObservable(
  148. OnNext(150, 1),
  149. OnCompleted<int>(230)
  150. );
  151. var o2 = scheduler.CreateHotObservable(
  152. OnNext(150, 1)
  153. );
  154. var res = scheduler.Start(() =>
  155. o1.Concat(o2)
  156. );
  157. res.Messages.AssertEqual(
  158. );
  159. o1.Subscriptions.AssertEqual(
  160. Subscribe(200, 230)
  161. );
  162. o2.Subscriptions.AssertEqual(
  163. Subscribe(230, 1000)
  164. );
  165. }
  166. [Fact]
  167. public void Concat_NeverEmpty()
  168. {
  169. var scheduler = new TestScheduler();
  170. var o1 = scheduler.CreateHotObservable(
  171. OnNext(150, 1)
  172. );
  173. var o2 = scheduler.CreateHotObservable(
  174. OnNext(150, 1),
  175. OnCompleted<int>(230)
  176. );
  177. var res = scheduler.Start(() =>
  178. o1.Concat(o2)
  179. );
  180. res.Messages.AssertEqual(
  181. );
  182. o1.Subscriptions.AssertEqual(
  183. Subscribe(200, 1000)
  184. );
  185. o2.Subscriptions.AssertEqual(
  186. );
  187. }
  188. [Fact]
  189. public void Concat_NeverNever()
  190. {
  191. var scheduler = new TestScheduler();
  192. var o1 = scheduler.CreateHotObservable(
  193. OnNext(150, 1)
  194. );
  195. var o2 = scheduler.CreateHotObservable(
  196. OnNext(150, 1)
  197. );
  198. var res = scheduler.Start(() =>
  199. o1.Concat(o2)
  200. );
  201. res.Messages.AssertEqual(
  202. );
  203. o1.Subscriptions.AssertEqual(
  204. Subscribe(200, 1000)
  205. );
  206. o2.Subscriptions.AssertEqual(
  207. );
  208. }
  209. [Fact]
  210. public void Concat_EmptyThrow()
  211. {
  212. var scheduler = new TestScheduler();
  213. var ex = new Exception();
  214. var o1 = scheduler.CreateHotObservable(
  215. OnNext(150, 1),
  216. OnCompleted<int>(230)
  217. );
  218. var o2 = scheduler.CreateHotObservable(
  219. OnNext(150, 1),
  220. OnError<int>(250, ex)
  221. );
  222. var res = scheduler.Start(() =>
  223. o1.Concat(o2)
  224. );
  225. res.Messages.AssertEqual(
  226. OnError<int>(250, ex)
  227. );
  228. o1.Subscriptions.AssertEqual(
  229. Subscribe(200, 230)
  230. );
  231. o2.Subscriptions.AssertEqual(
  232. Subscribe(230, 250)
  233. );
  234. }
  235. [Fact]
  236. public void Concat_ThrowEmpty()
  237. {
  238. var scheduler = new TestScheduler();
  239. var ex = new Exception();
  240. var o1 = scheduler.CreateHotObservable(
  241. OnNext(150, 1),
  242. OnError<int>(230, ex)
  243. );
  244. var o2 = scheduler.CreateHotObservable(
  245. OnNext(150, 1),
  246. OnCompleted<int>(250)
  247. );
  248. var res = scheduler.Start(() =>
  249. o1.Concat(o2)
  250. );
  251. res.Messages.AssertEqual(
  252. OnError<int>(230, ex)
  253. );
  254. o1.Subscriptions.AssertEqual(
  255. Subscribe(200, 230)
  256. );
  257. o2.Subscriptions.AssertEqual(
  258. );
  259. }
  260. [Fact]
  261. public void Concat_ThrowThrow()
  262. {
  263. var scheduler = new TestScheduler();
  264. var ex = new Exception();
  265. var o1 = scheduler.CreateHotObservable(
  266. OnNext(150, 1),
  267. OnError<int>(230, ex)
  268. );
  269. var o2 = scheduler.CreateHotObservable(
  270. OnNext(150, 1),
  271. OnError<int>(250, new Exception())
  272. );
  273. var res = scheduler.Start(() =>
  274. o1.Concat(o2)
  275. );
  276. res.Messages.AssertEqual(
  277. OnError<int>(230, ex)
  278. );
  279. o1.Subscriptions.AssertEqual(
  280. Subscribe(200, 230)
  281. );
  282. o2.Subscriptions.AssertEqual(
  283. );
  284. }
  285. [Fact]
  286. public void Concat_ReturnEmpty()
  287. {
  288. var scheduler = new TestScheduler();
  289. var o1 = scheduler.CreateHotObservable(
  290. OnNext(150, 1),
  291. OnNext(210, 2),
  292. OnCompleted<int>(230)
  293. );
  294. var o2 = scheduler.CreateHotObservable(
  295. OnNext(150, 1),
  296. OnCompleted<int>(250)
  297. );
  298. var res = scheduler.Start(() =>
  299. o1.Concat(o2)
  300. );
  301. res.Messages.AssertEqual(
  302. OnNext(210, 2),
  303. OnCompleted<int>(250)
  304. );
  305. o1.Subscriptions.AssertEqual(
  306. Subscribe(200, 230)
  307. );
  308. o2.Subscriptions.AssertEqual(
  309. Subscribe(230, 250)
  310. );
  311. }
  312. [Fact]
  313. public void Concat_EmptyReturn()
  314. {
  315. var scheduler = new TestScheduler();
  316. var o1 = scheduler.CreateHotObservable(
  317. OnNext(150, 1),
  318. OnCompleted<int>(230)
  319. );
  320. var o2 = scheduler.CreateHotObservable(
  321. OnNext(150, 1),
  322. OnNext(240, 2),
  323. OnCompleted<int>(250)
  324. );
  325. var res = scheduler.Start(() =>
  326. o1.Concat(o2)
  327. );
  328. res.Messages.AssertEqual(
  329. OnNext(240, 2),
  330. OnCompleted<int>(250)
  331. );
  332. o1.Subscriptions.AssertEqual(
  333. Subscribe(200, 230)
  334. );
  335. o2.Subscriptions.AssertEqual(
  336. Subscribe(230, 250)
  337. );
  338. }
  339. [Fact]
  340. public void Concat_ReturnNever()
  341. {
  342. var scheduler = new TestScheduler();
  343. var o1 = scheduler.CreateHotObservable(
  344. OnNext(150, 1),
  345. OnNext(210, 2),
  346. OnCompleted<int>(230)
  347. );
  348. var o2 = scheduler.CreateHotObservable(
  349. OnNext(150, 1)
  350. );
  351. var res = scheduler.Start(() =>
  352. o1.Concat(o2)
  353. );
  354. res.Messages.AssertEqual(
  355. OnNext(210, 2)
  356. );
  357. o1.Subscriptions.AssertEqual(
  358. Subscribe(200, 230)
  359. );
  360. o2.Subscriptions.AssertEqual(
  361. Subscribe(230, 1000)
  362. );
  363. }
  364. [Fact]
  365. public void Concat_NeverReturn()
  366. {
  367. var scheduler = new TestScheduler();
  368. var o1 = scheduler.CreateHotObservable(
  369. OnNext(150, 1)
  370. );
  371. var o2 = scheduler.CreateHotObservable(
  372. OnNext(150, 1),
  373. OnNext(210, 2),
  374. OnCompleted<int>(230)
  375. );
  376. var res = scheduler.Start(() =>
  377. o1.Concat(o2)
  378. );
  379. res.Messages.AssertEqual(
  380. );
  381. o1.Subscriptions.AssertEqual(
  382. Subscribe(200, 1000)
  383. );
  384. o2.Subscriptions.AssertEqual(
  385. );
  386. }
  387. [Fact]
  388. public void Concat_ReturnReturn()
  389. {
  390. var scheduler = new TestScheduler();
  391. var ex = new Exception();
  392. var o1 = scheduler.CreateHotObservable(
  393. OnNext(150, 1),
  394. OnNext(220, 2),
  395. OnCompleted<int>(230)
  396. );
  397. var o2 = scheduler.CreateHotObservable(
  398. OnNext(150, 1),
  399. OnNext(240, 3),
  400. OnCompleted<int>(250)
  401. );
  402. var res = scheduler.Start(() =>
  403. o1.Concat(o2)
  404. );
  405. res.Messages.AssertEqual(
  406. OnNext(220, 2),
  407. OnNext(240, 3),
  408. OnCompleted<int>(250)
  409. );
  410. o1.Subscriptions.AssertEqual(
  411. Subscribe(200, 230)
  412. );
  413. o2.Subscriptions.AssertEqual(
  414. Subscribe(230, 250)
  415. );
  416. }
  417. [Fact]
  418. public void Concat_ThrowReturn()
  419. {
  420. var scheduler = new TestScheduler();
  421. var ex = new Exception();
  422. var o1 = scheduler.CreateHotObservable(
  423. OnNext(150, 1),
  424. OnError<int>(230, ex)
  425. );
  426. var o2 = scheduler.CreateHotObservable(
  427. OnNext(150, 1),
  428. OnNext(240, 2),
  429. OnCompleted<int>(250)
  430. );
  431. var res = scheduler.Start(() =>
  432. o1.Concat(o2)
  433. );
  434. res.Messages.AssertEqual(
  435. OnError<int>(230, ex)
  436. );
  437. o1.Subscriptions.AssertEqual(
  438. Subscribe(200, 230)
  439. );
  440. o2.Subscriptions.AssertEqual(
  441. );
  442. }
  443. [Fact]
  444. public void Concat_ReturnThrow()
  445. {
  446. var scheduler = new TestScheduler();
  447. var ex = new Exception();
  448. var o1 = scheduler.CreateHotObservable(
  449. OnNext(150, 1),
  450. OnNext(220, 2),
  451. OnCompleted<int>(230)
  452. );
  453. var o2 = scheduler.CreateHotObservable(
  454. OnNext(150, 1),
  455. OnError<int>(250, ex)
  456. );
  457. var res = scheduler.Start(() =>
  458. o1.Concat(o2)
  459. );
  460. res.Messages.AssertEqual(
  461. OnNext(220, 2),
  462. OnError<int>(250, ex)
  463. );
  464. o1.Subscriptions.AssertEqual(
  465. Subscribe(200, 230)
  466. );
  467. o2.Subscriptions.AssertEqual(
  468. Subscribe(230, 250)
  469. );
  470. }
  471. [Fact]
  472. public void Concat_SomeDataSomeData()
  473. {
  474. var scheduler = new TestScheduler();
  475. var ex = new Exception();
  476. var o1 = scheduler.CreateHotObservable(
  477. OnNext(150, 1),
  478. OnNext(210, 2),
  479. OnNext(220, 3),
  480. OnCompleted<int>(225)
  481. );
  482. var o2 = scheduler.CreateHotObservable(
  483. OnNext(150, 1),
  484. OnNext(230, 4),
  485. OnNext(240, 5),
  486. OnCompleted<int>(250)
  487. );
  488. var res = scheduler.Start(() =>
  489. o1.Concat(o2)
  490. );
  491. res.Messages.AssertEqual(
  492. OnNext(210, 2),
  493. OnNext(220, 3),
  494. OnNext(230, 4),
  495. OnNext(240, 5),
  496. OnCompleted<int>(250)
  497. );
  498. o1.Subscriptions.AssertEqual(
  499. Subscribe(200, 225)
  500. );
  501. o2.Subscriptions.AssertEqual(
  502. Subscribe(225, 250)
  503. );
  504. }
  505. [Fact]
  506. public void Concat_EnumerableThrows()
  507. {
  508. var scheduler = new TestScheduler();
  509. var o = scheduler.CreateHotObservable(
  510. OnNext(150, 1),
  511. OnNext(210, 2),
  512. OnNext(220, 3),
  513. OnCompleted<int>(225)
  514. );
  515. var ex = new Exception();
  516. var xss = new MockEnumerable<IObservable<int>>(scheduler, GetObservablesForConcatThrow(o, ex));
  517. var res = scheduler.Start(() =>
  518. xss.Concat()
  519. );
  520. res.Messages.AssertEqual(
  521. OnNext(210, 2),
  522. OnNext(220, 3),
  523. OnError<int>(225, ex)
  524. );
  525. o.Subscriptions.AssertEqual(
  526. Subscribe(200, 225)
  527. );
  528. xss.Subscriptions.AssertEqual(
  529. Subscribe(200, 225)
  530. );
  531. }
  532. private IEnumerable<IObservable<int>> GetObservablesForConcatThrow(IObservable<int> first, Exception ex)
  533. {
  534. yield return first;
  535. throw ex;
  536. }
  537. [Fact]
  538. public void Concat_EnumerableTiming()
  539. {
  540. var scheduler = new TestScheduler();
  541. var o1 = scheduler.CreateHotObservable(
  542. OnNext(150, 1),
  543. OnNext(210, 2), // !
  544. OnNext(220, 3), // !
  545. OnCompleted<int>(230)
  546. );
  547. var o2 = scheduler.CreateColdObservable(
  548. OnNext(50, 4), // !
  549. OnNext(60, 5), // !
  550. OnNext(70, 6), // !
  551. OnCompleted<int>(80)
  552. );
  553. var o3 = scheduler.CreateHotObservable(
  554. OnNext(150, 1),
  555. OnNext(200, 2),
  556. OnNext(210, 3),
  557. OnNext(220, 4),
  558. OnNext(230, 5),
  559. OnNext(270, 6),
  560. OnNext(320, 7), // !
  561. OnNext(330, 8), // !
  562. OnCompleted<int>(340)
  563. );
  564. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, new[] { o1, o2, o3, o2 });
  565. var res = scheduler.Start(() =>
  566. xss.Select(xs => (IObservable<int>)xs).Concat()
  567. );
  568. res.Messages.AssertEqual(
  569. OnNext(210, 2),
  570. OnNext(220, 3),
  571. OnNext(280, 4),
  572. OnNext(290, 5),
  573. OnNext(300, 6),
  574. OnNext(320, 7),
  575. OnNext(330, 8),
  576. OnNext(390, 4),
  577. OnNext(400, 5),
  578. OnNext(410, 6),
  579. OnCompleted<int>(420)
  580. );
  581. o1.Subscriptions.AssertEqual(
  582. Subscribe(200, 230)
  583. );
  584. o2.Subscriptions.AssertEqual(
  585. Subscribe(230, 310),
  586. Subscribe(340, 420)
  587. );
  588. o3.Subscriptions.AssertEqual(
  589. Subscribe(310, 340)
  590. );
  591. xss.Subscriptions.AssertEqual(
  592. Subscribe(200, 420)
  593. );
  594. }
  595. [Fact]
  596. public void Concat_Enumerable_Dispose()
  597. {
  598. var scheduler = new TestScheduler();
  599. var o1 = scheduler.CreateHotObservable(
  600. OnNext(150, 1),
  601. OnNext(210, 2),
  602. OnNext(220, 3),
  603. OnCompleted<int>(230)
  604. );
  605. var o2 = scheduler.CreateHotObservable(
  606. OnNext(150, 1),
  607. OnNext(200, 2),
  608. OnNext(210, 3),
  609. OnNext(240, 4),
  610. OnNext(270, 5),
  611. OnNext(320, 6),
  612. OnNext(330, 7),
  613. OnCompleted<int>(340)
  614. );
  615. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, new[] { o1, o2 });
  616. var res = scheduler.Start(() =>
  617. xss.Select(xs => (IObservable<int>)xs).Concat(),
  618. 300
  619. );
  620. res.Messages.AssertEqual(
  621. OnNext(210, 2),
  622. OnNext(220, 3),
  623. OnNext(240, 4),
  624. OnNext(270, 5)
  625. );
  626. o1.Subscriptions.AssertEqual(
  627. Subscribe(200, 230)
  628. );
  629. o2.Subscriptions.AssertEqual(
  630. Subscribe(230, 300)
  631. );
  632. xss.Subscriptions.AssertEqual(
  633. Subscribe(200, 300)
  634. );
  635. }
  636. [Fact]
  637. public void Concat_Optimization_DeferEvalTiming()
  638. {
  639. var scheduler = new TestScheduler();
  640. var o1 = scheduler.CreateHotObservable(
  641. OnNext(150, 1),
  642. OnNext(210, 2),
  643. OnNext(220, 3),
  644. OnCompleted<int>(230)
  645. );
  646. var o2 = scheduler.CreateColdObservable(
  647. OnNext(10, 4),
  648. OnNext(20, 5),
  649. OnNext(30, 6),
  650. OnCompleted<int>(40)
  651. );
  652. var invoked = default(long);
  653. var xs = o1;
  654. var ys = Observable.Defer(() => { invoked = scheduler.Clock; return o2; });
  655. var res = scheduler.Start(() =>
  656. xs.Concat(ys)
  657. );
  658. res.Messages.AssertEqual(
  659. OnNext(210, 2),
  660. OnNext(220, 3),
  661. OnNext(240, 4),
  662. OnNext(250, 5),
  663. OnNext(260, 6),
  664. OnCompleted<int>(270)
  665. );
  666. o1.Subscriptions.AssertEqual(
  667. Subscribe(200, 230)
  668. );
  669. o2.Subscriptions.AssertEqual(
  670. Subscribe(230, 270)
  671. );
  672. Assert.Equal(230, invoked);
  673. }
  674. [Fact]
  675. public void Concat_Optimization_DeferExceptionPropagation()
  676. {
  677. var scheduler = new TestScheduler();
  678. var o1 = scheduler.CreateHotObservable(
  679. OnNext(150, 1),
  680. OnNext(210, 2),
  681. OnCompleted<int>(220)
  682. );
  683. var ex = new Exception();
  684. var invoked = default(long);
  685. var xs = o1;
  686. var ys = Observable.Defer(new Func<IObservable<int>>(() => { invoked = scheduler.Clock; throw ex; }));
  687. var res = scheduler.Start(() =>
  688. xs.Concat(ys)
  689. );
  690. res.Messages.AssertEqual(
  691. OnNext(210, 2),
  692. OnError<int>(220, ex)
  693. );
  694. o1.Subscriptions.AssertEqual(
  695. Subscribe(200, 220)
  696. );
  697. Assert.Equal(220, invoked);
  698. }
  699. #if !NO_PERF
  700. [Fact]
  701. public void Concat_TailRecursive1()
  702. {
  703. var create = 0L;
  704. var start = 200L;
  705. var end = 1000L;
  706. var scheduler = new TestScheduler();
  707. var o = scheduler.CreateColdObservable(
  708. OnNext(10, 1),
  709. OnNext(20, 2),
  710. OnNext(30, 3),
  711. OnCompleted<int>(40)
  712. );
  713. IObservable<int> f() => Observable.Defer(() => o.Concat(f()));
  714. var res = scheduler.Start(() => f(), create, start, end);
  715. var expected = new List<Recorded<Notification<int>>>();
  716. var t = start;
  717. while (t <= end)
  718. {
  719. var n = (t - start) / 10;
  720. if (n % 4 != 0)
  721. {
  722. expected.Add(OnNext(t, (int)(n % 4)));
  723. }
  724. t += 10;
  725. }
  726. res.Messages.AssertEqual(expected);
  727. }
  728. #if !NO_THREAD && !NETCOREAPP1_1 && !NETCOREAPP1_0
  729. [Fact]
  730. public void Concat_TailRecursive2()
  731. {
  732. IObservable<int> f(int x) => Observable.Defer(() => Observable.Return(x, ThreadPoolScheduler.Instance).Concat(f(x + 1)));
  733. var lst = new List<int>();
  734. f(0).Select(x => new StackTrace().FrameCount).Take(10).ForEach(lst.Add);
  735. Assert.True(lst.Last() - lst.First() < 10);
  736. }
  737. #endif
  738. #endif
  739. [Fact]
  740. public void Concat_Task()
  741. {
  742. var tss = Observable.Concat(new[] { Task.Factory.StartNew(() => 1), Task.Factory.StartNew(() => 2), Task.Factory.StartNew(() => 3) }.ToObservable());
  743. var res = tss.ToArray().Single();
  744. Assert.True(res.SequenceEqual(new[] { 1, 2, 3 }));
  745. }
  746. }
  747. }