MergeTest.cs 56 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using Microsoft.Reactive.Testing;
  14. using ReactiveTests.Dummies;
  15. using Xunit;
  16. namespace ReactiveTests.Tests
  17. {
  18. public class MergeTest : ReactiveTest
  19. {
  20. [Fact]
  21. public void Merge_ArgumentChecking()
  22. {
  23. var xs = DummyObservable<int>.Instance;
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IScheduler), xs, xs));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(xs, xs, default(IScheduler)));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(xs, null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IObservable<int>), xs));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IObservable<int>[])null));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IEnumerable<IObservable<int>>)null));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).Merge(xs, DummyScheduler.Instance));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => xs.Merge(default, DummyScheduler.Instance));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IEnumerable<IObservable<int>>)null, DummyScheduler.Instance));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(new IObservable<int>[0], default(IScheduler)));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IObservable<IObservable<int>>)null));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(DummyScheduler.Instance, (IObservable<int>[])null));
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IObservable<Task<int>>)null));
  37. }
  38. [Fact]
  39. public void Merge_DefaultScheduler()
  40. {
  41. var xs = Observable.Merge(Observable.Return(42), Observable.Return(43), Observable.Return(44));
  42. var res = xs.ToList().Single();
  43. Assert.True(new[] { 42, 43, 44 }.SequenceEqual(res));
  44. }
  45. [Fact]
  46. public void Merge_Never2()
  47. {
  48. var scheduler = new TestScheduler();
  49. var n1 = scheduler.CreateHotObservable(
  50. OnNext(150, 1)
  51. );
  52. var n2 = scheduler.CreateHotObservable(
  53. OnNext(150, 1)
  54. );
  55. var res = scheduler.Start(() =>
  56. Observable.Merge(scheduler, n1, n2)
  57. );
  58. res.Messages.AssertEqual(
  59. );
  60. n1.Subscriptions.AssertEqual(
  61. Subscribe(201, 1000)
  62. );
  63. n2.Subscriptions.AssertEqual(
  64. Subscribe(202, 1000)
  65. );
  66. }
  67. [Fact]
  68. public void Merge_Never3()
  69. {
  70. var scheduler = new TestScheduler();
  71. var n1 = scheduler.CreateHotObservable(
  72. OnNext(150, 1)
  73. );
  74. var n2 = scheduler.CreateHotObservable(
  75. OnNext(150, 1)
  76. );
  77. var n3 = scheduler.CreateHotObservable(
  78. OnNext(150, 1)
  79. );
  80. var res = scheduler.Start(() =>
  81. Observable.Merge(scheduler, n1, n2, n3)
  82. );
  83. res.Messages.AssertEqual(
  84. );
  85. n1.Subscriptions.AssertEqual(
  86. Subscribe(201, 1000)
  87. );
  88. n2.Subscriptions.AssertEqual(
  89. Subscribe(202, 1000)
  90. );
  91. n3.Subscriptions.AssertEqual(
  92. Subscribe(203, 1000)
  93. );
  94. }
  95. [Fact]
  96. public void Merge_Empty2()
  97. {
  98. var scheduler = new TestScheduler();
  99. var e1 = scheduler.CreateHotObservable(
  100. OnNext(150, 1),
  101. OnCompleted<int>(210)
  102. );
  103. var e2 = scheduler.CreateHotObservable(
  104. OnNext(150, 1),
  105. OnCompleted<int>(230)
  106. );
  107. var res = scheduler.Start(() =>
  108. Observable.Merge(scheduler, e1, e2)
  109. );
  110. res.Messages.AssertEqual(
  111. OnCompleted<int>(230)
  112. );
  113. e1.Subscriptions.AssertEqual(
  114. Subscribe(201, 210)
  115. );
  116. e2.Subscriptions.AssertEqual(
  117. Subscribe(202, 230)
  118. );
  119. }
  120. [Fact]
  121. public void Merge_Empty3()
  122. {
  123. var scheduler = new TestScheduler();
  124. var e1 = scheduler.CreateHotObservable(
  125. OnNext(150, 1),
  126. OnCompleted<int>(210)
  127. );
  128. var e2 = scheduler.CreateHotObservable(
  129. OnNext(150, 1),
  130. OnCompleted<int>(230)
  131. );
  132. var e3 = scheduler.CreateHotObservable(
  133. OnNext(150, 1),
  134. OnCompleted<int>(240)
  135. );
  136. var res = scheduler.Start(() =>
  137. Observable.Merge(scheduler, e1, e2, e3)
  138. );
  139. res.Messages.AssertEqual(
  140. OnCompleted<int>(240)
  141. );
  142. e1.Subscriptions.AssertEqual(
  143. Subscribe(201, 210)
  144. );
  145. e2.Subscriptions.AssertEqual(
  146. Subscribe(202, 230)
  147. );
  148. e3.Subscriptions.AssertEqual(
  149. Subscribe(203, 240)
  150. );
  151. }
  152. [Fact]
  153. public void Merge_EmptyDelayed2_RightLast()
  154. {
  155. var scheduler = new TestScheduler();
  156. var e1 = scheduler.CreateHotObservable(
  157. OnNext(150, 1),
  158. OnCompleted<int>(240)
  159. );
  160. var e2 = scheduler.CreateHotObservable(
  161. OnNext(150, 1),
  162. OnCompleted<int>(250)
  163. );
  164. var res = scheduler.Start(() =>
  165. Observable.Merge(scheduler, e1, e2)
  166. );
  167. res.Messages.AssertEqual(
  168. OnCompleted<int>(250)
  169. );
  170. e1.Subscriptions.AssertEqual(
  171. Subscribe(201, 240)
  172. );
  173. e2.Subscriptions.AssertEqual(
  174. Subscribe(202, 250)
  175. );
  176. }
  177. [Fact]
  178. public void Merge_EmptyDelayed2_LeftLast()
  179. {
  180. var scheduler = new TestScheduler();
  181. var e1 = scheduler.CreateHotObservable(
  182. OnNext(150, 1),
  183. OnCompleted<int>(250)
  184. );
  185. var e2 = scheduler.CreateHotObservable(
  186. OnNext(150, 1),
  187. OnCompleted<int>(240)
  188. );
  189. var res = scheduler.Start(() =>
  190. Observable.Merge(scheduler, e1, e2)
  191. );
  192. res.Messages.AssertEqual(
  193. OnCompleted<int>(250)
  194. );
  195. e1.Subscriptions.AssertEqual(
  196. Subscribe(201, 250)
  197. );
  198. e2.Subscriptions.AssertEqual(
  199. Subscribe(202, 240)
  200. );
  201. }
  202. [Fact]
  203. public void Merge_EmptyDelayed3_MiddleLast()
  204. {
  205. var scheduler = new TestScheduler();
  206. var e1 = scheduler.CreateHotObservable(
  207. OnNext(150, 1),
  208. OnCompleted<int>(245)
  209. );
  210. var e2 = scheduler.CreateHotObservable(
  211. OnNext(150, 1),
  212. OnCompleted<int>(250)
  213. );
  214. var e3 = scheduler.CreateHotObservable(
  215. OnNext(150, 1),
  216. OnCompleted<int>(240)
  217. );
  218. var res = scheduler.Start(() =>
  219. Observable.Merge(scheduler, e1, e2, e3)
  220. );
  221. res.Messages.AssertEqual(
  222. OnCompleted<int>(250)
  223. );
  224. e1.Subscriptions.AssertEqual(
  225. Subscribe(201, 245)
  226. );
  227. e2.Subscriptions.AssertEqual(
  228. Subscribe(202, 250)
  229. );
  230. e3.Subscriptions.AssertEqual(
  231. Subscribe(203, 240)
  232. );
  233. }
  234. [Fact]
  235. public void Merge_EmptyNever()
  236. {
  237. var scheduler = new TestScheduler();
  238. var e1 = scheduler.CreateHotObservable(
  239. OnNext(150, 1),
  240. OnCompleted<int>(245)
  241. );
  242. var n1 = scheduler.CreateHotObservable(
  243. OnNext(150, 1)
  244. );
  245. var res = scheduler.Start(() =>
  246. Observable.Merge(scheduler, e1, n1)
  247. );
  248. res.Messages.AssertEqual(
  249. );
  250. e1.Subscriptions.AssertEqual(
  251. Subscribe(201, 245)
  252. );
  253. n1.Subscriptions.AssertEqual(
  254. Subscribe(202, 1000)
  255. );
  256. }
  257. [Fact]
  258. public void Merge_NeverEmpty()
  259. {
  260. var scheduler = new TestScheduler();
  261. var n1 = scheduler.CreateHotObservable(
  262. OnNext(150, 1)
  263. );
  264. var e1 = scheduler.CreateHotObservable(
  265. OnNext(150, 1),
  266. OnCompleted<int>(245)
  267. );
  268. var res = scheduler.Start(() =>
  269. Observable.Merge(scheduler, n1, e1)
  270. );
  271. res.Messages.AssertEqual(
  272. );
  273. n1.Subscriptions.AssertEqual(
  274. Subscribe(201, 1000)
  275. );
  276. e1.Subscriptions.AssertEqual(
  277. Subscribe(202, 245)
  278. );
  279. }
  280. [Fact]
  281. public void Merge_ReturnNever()
  282. {
  283. var scheduler = new TestScheduler();
  284. var r1 = scheduler.CreateHotObservable(
  285. OnNext(150, 1),
  286. OnNext(210, 2),
  287. OnCompleted<int>(245)
  288. );
  289. var n1 = scheduler.CreateHotObservable(
  290. OnNext(150, 1)
  291. );
  292. var res = scheduler.Start(() =>
  293. Observable.Merge(scheduler, r1, n1)
  294. );
  295. res.Messages.AssertEqual(
  296. OnNext(210, 2)
  297. );
  298. r1.Subscriptions.AssertEqual(
  299. Subscribe(201, 245)
  300. );
  301. n1.Subscriptions.AssertEqual(
  302. Subscribe(202, 1000)
  303. );
  304. }
  305. [Fact]
  306. public void Merge_NeverReturn()
  307. {
  308. var scheduler = new TestScheduler();
  309. var n1 = scheduler.CreateHotObservable(
  310. OnNext(150, 1)
  311. );
  312. var r1 = scheduler.CreateHotObservable(
  313. OnNext(150, 1),
  314. OnNext(210, 2),
  315. OnCompleted<int>(245)
  316. );
  317. var res = scheduler.Start(() =>
  318. Observable.Merge(scheduler, n1, r1)
  319. );
  320. res.Messages.AssertEqual(
  321. OnNext(210, 2)
  322. );
  323. n1.Subscriptions.AssertEqual(
  324. Subscribe(201, 1000)
  325. );
  326. r1.Subscriptions.AssertEqual(
  327. Subscribe(202, 245)
  328. );
  329. }
  330. [Fact]
  331. public void Merge_ErrorNever()
  332. {
  333. var scheduler = new TestScheduler();
  334. var ex = new Exception();
  335. var e1 = scheduler.CreateHotObservable(
  336. OnNext(150, 1),
  337. OnNext(210, 2),
  338. OnError<int>(245, ex)
  339. );
  340. var n1 = scheduler.CreateHotObservable(
  341. OnNext(150, 1)
  342. );
  343. var res = scheduler.Start(() =>
  344. Observable.Merge(scheduler, e1, n1)
  345. );
  346. res.Messages.AssertEqual(
  347. OnNext(210, 2),
  348. OnError<int>(245, ex)
  349. );
  350. e1.Subscriptions.AssertEqual(
  351. Subscribe(201, 245)
  352. );
  353. n1.Subscriptions.AssertEqual(
  354. Subscribe(202, 245)
  355. );
  356. }
  357. [Fact]
  358. public void Merge_NeverError()
  359. {
  360. var scheduler = new TestScheduler();
  361. var ex = new Exception();
  362. var n1 = scheduler.CreateHotObservable(
  363. OnNext(150, 1)
  364. );
  365. var e1 = scheduler.CreateHotObservable(
  366. OnNext(150, 1),
  367. OnNext(210, 2),
  368. OnError<int>(245, ex)
  369. );
  370. var res = scheduler.Start(() =>
  371. Observable.Merge(scheduler, n1, e1)
  372. );
  373. res.Messages.AssertEqual(
  374. OnNext(210, 2),
  375. OnError<int>(245, ex)
  376. );
  377. n1.Subscriptions.AssertEqual(
  378. Subscribe(201, 245)
  379. );
  380. e1.Subscriptions.AssertEqual(
  381. Subscribe(202, 245)
  382. );
  383. }
  384. [Fact]
  385. public void Merge_EmptyReturn()
  386. {
  387. var scheduler = new TestScheduler();
  388. var e1 = scheduler.CreateHotObservable(
  389. OnNext(150, 1),
  390. OnCompleted<int>(245)
  391. );
  392. var r1 = scheduler.CreateHotObservable(
  393. OnNext(150, 1),
  394. OnNext(210, 2),
  395. OnCompleted<int>(250)
  396. );
  397. var res = scheduler.Start(() =>
  398. Observable.Merge(scheduler, e1, r1)
  399. );
  400. res.Messages.AssertEqual(
  401. OnNext(210, 2),
  402. OnCompleted<int>(250)
  403. );
  404. e1.Subscriptions.AssertEqual(
  405. Subscribe(201, 245)
  406. );
  407. r1.Subscriptions.AssertEqual(
  408. Subscribe(202, 250)
  409. );
  410. }
  411. [Fact]
  412. public void Merge_ReturnEmpty()
  413. {
  414. var scheduler = new TestScheduler();
  415. var r1 = scheduler.CreateHotObservable(
  416. OnNext(150, 1),
  417. OnNext(210, 2),
  418. OnCompleted<int>(250)
  419. );
  420. var e1 = scheduler.CreateHotObservable(
  421. OnNext(150, 1),
  422. OnCompleted<int>(245)
  423. );
  424. var res = scheduler.Start(() =>
  425. Observable.Merge(scheduler, r1, e1)
  426. );
  427. res.Messages.AssertEqual(
  428. OnNext(210, 2),
  429. OnCompleted<int>(250)
  430. );
  431. r1.Subscriptions.AssertEqual(
  432. Subscribe(201, 250)
  433. );
  434. e1.Subscriptions.AssertEqual(
  435. Subscribe(202, 245)
  436. );
  437. }
  438. [Fact]
  439. public void Merge_Lots2()
  440. {
  441. var scheduler = new TestScheduler();
  442. var o1 = scheduler.CreateHotObservable(
  443. OnNext(150, 1),
  444. OnNext(210, 2),
  445. OnNext(220, 4),
  446. OnNext(230, 6),
  447. OnNext(240, 8),
  448. OnCompleted<int>(245)
  449. );
  450. var o2 = scheduler.CreateHotObservable(
  451. OnNext(150, 1),
  452. OnNext(215, 3),
  453. OnNext(225, 5),
  454. OnNext(235, 7),
  455. OnNext(245, 9),
  456. OnCompleted<int>(250)
  457. );
  458. var res = scheduler.Start(() =>
  459. Observable.Merge(scheduler, o1, o2)
  460. );
  461. res.Messages.AssertEqual(
  462. OnNext(210, 2),
  463. OnNext(215, 3),
  464. OnNext(220, 4),
  465. OnNext(225, 5),
  466. OnNext(230, 6),
  467. OnNext(235, 7),
  468. OnNext(240, 8),
  469. OnNext(245, 9),
  470. OnCompleted<int>(250)
  471. );
  472. o1.Subscriptions.AssertEqual(
  473. Subscribe(201, 245)
  474. );
  475. o2.Subscriptions.AssertEqual(
  476. Subscribe(202, 250)
  477. );
  478. }
  479. [Fact]
  480. public void Merge_Lots3()
  481. {
  482. var scheduler = new TestScheduler();
  483. var o1 = scheduler.CreateHotObservable(
  484. OnNext(150, 1),
  485. OnNext(210, 2),
  486. OnNext(225, 5),
  487. OnNext(240, 8),
  488. OnCompleted<int>(245)
  489. );
  490. var o2 = scheduler.CreateHotObservable(
  491. OnNext(150, 1),
  492. OnNext(215, 3),
  493. OnNext(230, 6),
  494. OnNext(245, 9),
  495. OnCompleted<int>(250)
  496. );
  497. var o3 = scheduler.CreateHotObservable(
  498. OnNext(150, 1),
  499. OnNext(220, 4),
  500. OnNext(235, 7),
  501. OnCompleted<int>(240)
  502. );
  503. var res = scheduler.Start(() =>
  504. new[] { o1, o2, o3 }.Merge(scheduler)
  505. );
  506. res.Messages.AssertEqual(
  507. OnNext(210, 2),
  508. OnNext(215, 3),
  509. OnNext(220, 4),
  510. OnNext(225, 5),
  511. OnNext(230, 6),
  512. OnNext(235, 7),
  513. OnNext(240, 8),
  514. OnNext(245, 9),
  515. OnCompleted<int>(250)
  516. );
  517. o1.Subscriptions.AssertEqual(
  518. Subscribe(201, 245)
  519. );
  520. o2.Subscriptions.AssertEqual(
  521. Subscribe(202, 250)
  522. );
  523. o3.Subscriptions.AssertEqual(
  524. Subscribe(203, 240)
  525. );
  526. }
  527. [Fact]
  528. public void Merge_LotsMore()
  529. {
  530. var inputs = new List<List<Recorded<Notification<int>>>>();
  531. const int N = 10;
  532. for (var i = 0; i < N; i++)
  533. {
  534. var lst = new List<Recorded<Notification<int>>> { OnNext(150, 1) };
  535. inputs.Add(lst);
  536. var start = (ushort)(301 + i);
  537. for (var j = 0; j < i; j++)
  538. {
  539. var onNext = OnNext(start += (ushort)(j * 5), j + i + 2);
  540. lst.Add(onNext);
  541. }
  542. lst.Add(OnCompleted<int>((ushort)(start + N - i)));
  543. }
  544. var inputsFlat = inputs.Aggregate((l, r) => l.Concat(r).ToList()).ToArray();
  545. var resOnNext = (from n in inputsFlat
  546. where n.Time >= 200
  547. where n.Value.Kind == NotificationKind.OnNext
  548. orderby n.Time
  549. select n).ToList();
  550. var lastCompleted = (from n in inputsFlat
  551. where n.Time >= 200
  552. where n.Value.Kind == NotificationKind.OnCompleted
  553. orderby n.Time descending
  554. select n).First();
  555. var scheduler = new TestScheduler();
  556. // Last ToArray: got to create the hot observables *now*
  557. var xss = inputs.Select(lst => (IObservable<int>)scheduler.CreateHotObservable(lst.ToArray())).ToArray();
  558. var res = scheduler.Start(() =>
  559. xss.Merge(scheduler)
  560. );
  561. Assert.True(resOnNext.Count + 1 == res.Messages.Count, "length");
  562. for (var i = 0; i < resOnNext.Count; i++)
  563. {
  564. var msg = res.Messages[i];
  565. Assert.True(msg.Time == resOnNext[i].Time);
  566. Assert.True(msg.Value.Kind == NotificationKind.OnNext);
  567. Assert.True(msg.Value.Value == resOnNext[i].Value.Value);
  568. }
  569. Assert.True(res.Messages[resOnNext.Count].Value.Kind == NotificationKind.OnCompleted && res.Messages[resOnNext.Count].Time == lastCompleted.Time, "complete");
  570. }
  571. [Fact]
  572. public void Merge_ErrorLeft()
  573. {
  574. var scheduler = new TestScheduler();
  575. var ex = new Exception();
  576. var o1 = scheduler.CreateHotObservable(
  577. OnNext(150, 1),
  578. OnNext(210, 2),
  579. OnError<int>(245, ex)
  580. );
  581. var o2 = scheduler.CreateHotObservable(
  582. OnNext(150, 1),
  583. OnNext(215, 3),
  584. OnCompleted<int>(250)
  585. );
  586. var res = scheduler.Start(() =>
  587. Observable.Merge(o1, o2, scheduler)
  588. );
  589. res.Messages.AssertEqual(
  590. OnNext(210, 2),
  591. OnNext(215, 3),
  592. OnError<int>(245, ex)
  593. );
  594. o1.Subscriptions.AssertEqual(
  595. Subscribe(201, 245)
  596. );
  597. o2.Subscriptions.AssertEqual(
  598. Subscribe(202, 245)
  599. );
  600. }
  601. [Fact]
  602. public void Merge_ErrorCausesDisposal()
  603. {
  604. var scheduler = new TestScheduler();
  605. var ex = new Exception();
  606. var e1 = scheduler.CreateHotObservable(
  607. OnNext(150, 1),
  608. OnError<int>(210, ex) //!
  609. );
  610. var o1 = scheduler.CreateHotObservable(
  611. OnNext(150, 1),
  612. OnNext(220, 1), // should not come
  613. OnCompleted<int>(230)
  614. );
  615. var res = scheduler.Start(() =>
  616. Observable.Merge(e1, o1, scheduler)
  617. );
  618. res.Messages.AssertEqual(
  619. OnError<int>(210, ex) //!
  620. );
  621. e1.Subscriptions.AssertEqual(
  622. Subscribe(201, 210)
  623. );
  624. o1.Subscriptions.AssertEqual(
  625. Subscribe(202, 210)
  626. );
  627. }
  628. [Fact]
  629. public void Merge_ObservableOfObservable_Data()
  630. {
  631. var scheduler = new TestScheduler();
  632. var ys1 = scheduler.CreateColdObservable(
  633. OnNext(10, 101),
  634. OnNext(20, 102),
  635. OnNext(110, 103),
  636. OnNext(120, 104),
  637. OnNext(210, 105),
  638. OnNext(220, 106),
  639. OnCompleted<int>(230)
  640. );
  641. var ys2 = scheduler.CreateColdObservable(
  642. OnNext(10, 201),
  643. OnNext(20, 202),
  644. OnNext(30, 203),
  645. OnNext(40, 204),
  646. OnCompleted<int>(50)
  647. );
  648. var ys3 = scheduler.CreateColdObservable(
  649. OnNext(10, 301),
  650. OnNext(20, 302),
  651. OnNext(30, 303),
  652. OnNext(40, 304),
  653. OnNext(120, 305),
  654. OnCompleted<int>(150)
  655. );
  656. var xs = scheduler.CreateHotObservable(
  657. OnNext<IObservable<int>>(300, ys1),
  658. OnNext<IObservable<int>>(400, ys2),
  659. OnNext<IObservable<int>>(500, ys3),
  660. OnCompleted<IObservable<int>>(600)
  661. );
  662. var res = scheduler.Start(() =>
  663. xs.Merge()
  664. );
  665. res.Messages.AssertEqual(
  666. OnNext(310, 101),
  667. OnNext(320, 102),
  668. OnNext(410, 103),
  669. OnNext(410, 201),
  670. OnNext(420, 104),
  671. OnNext(420, 202),
  672. OnNext(430, 203),
  673. OnNext(440, 204),
  674. OnNext(510, 105),
  675. OnNext(510, 301),
  676. OnNext(520, 106),
  677. OnNext(520, 302),
  678. OnNext(530, 303),
  679. OnNext(540, 304),
  680. OnNext(620, 305),
  681. OnCompleted<int>(650)
  682. );
  683. #if !NO_PERF
  684. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  685. xs.Subscriptions.AssertEqual(
  686. Subscribe(200, 600)
  687. );
  688. #else
  689. xs.Subscriptions.AssertEqual(
  690. Subscribe(200, 650)
  691. );
  692. #endif
  693. ys1.Subscriptions.AssertEqual(
  694. Subscribe(300, 530)
  695. );
  696. ys2.Subscriptions.AssertEqual(
  697. Subscribe(400, 450)
  698. );
  699. ys3.Subscriptions.AssertEqual(
  700. Subscribe(500, 650)
  701. );
  702. }
  703. [Fact]
  704. public void Merge_ObservableOfObservable_Data_NonOverlapped()
  705. {
  706. var scheduler = new TestScheduler();
  707. var ys1 = scheduler.CreateColdObservable(
  708. OnNext(10, 101),
  709. OnNext(20, 102),
  710. OnCompleted<int>(230)
  711. );
  712. var ys2 = scheduler.CreateColdObservable(
  713. OnNext(10, 201),
  714. OnNext(20, 202),
  715. OnNext(30, 203),
  716. OnNext(40, 204),
  717. OnCompleted<int>(50)
  718. );
  719. var ys3 = scheduler.CreateColdObservable(
  720. OnNext(10, 301),
  721. OnNext(20, 302),
  722. OnNext(30, 303),
  723. OnNext(40, 304),
  724. OnCompleted<int>(50)
  725. );
  726. var xs = scheduler.CreateHotObservable(
  727. OnNext<IObservable<int>>(300, ys1),
  728. OnNext<IObservable<int>>(400, ys2),
  729. OnNext<IObservable<int>>(500, ys3),
  730. OnCompleted<IObservable<int>>(600)
  731. );
  732. var res = scheduler.Start(() =>
  733. xs.Merge()
  734. );
  735. res.Messages.AssertEqual(
  736. OnNext(310, 101),
  737. OnNext(320, 102),
  738. OnNext(410, 201),
  739. OnNext(420, 202),
  740. OnNext(430, 203),
  741. OnNext(440, 204),
  742. OnNext(510, 301),
  743. OnNext(520, 302),
  744. OnNext(530, 303),
  745. OnNext(540, 304),
  746. OnCompleted<int>(600)
  747. );
  748. xs.Subscriptions.AssertEqual(
  749. Subscribe(200, 600)
  750. );
  751. ys1.Subscriptions.AssertEqual(
  752. Subscribe(300, 530)
  753. );
  754. ys2.Subscriptions.AssertEqual(
  755. Subscribe(400, 450)
  756. );
  757. ys3.Subscriptions.AssertEqual(
  758. Subscribe(500, 550)
  759. );
  760. }
  761. [Fact]
  762. public void Merge_ObservableOfObservable_InnerThrows()
  763. {
  764. var scheduler = new TestScheduler();
  765. var ex = new Exception();
  766. var ys1 = scheduler.CreateColdObservable(
  767. OnNext(10, 101),
  768. OnNext(20, 102),
  769. OnNext(110, 103),
  770. OnNext(120, 104),
  771. OnNext(210, 105),
  772. OnNext(220, 106),
  773. OnCompleted<int>(230)
  774. );
  775. var ys2 = scheduler.CreateColdObservable(
  776. OnNext(10, 201),
  777. OnNext(20, 202),
  778. OnNext(30, 203),
  779. OnNext(40, 204),
  780. OnError<int>(50, ex)
  781. );
  782. var ys3 = scheduler.CreateColdObservable(
  783. OnNext(10, 301),
  784. OnNext(20, 302),
  785. OnNext(30, 303),
  786. OnNext(40, 304),
  787. OnCompleted<int>(150)
  788. );
  789. var xs = scheduler.CreateHotObservable(
  790. OnNext<IObservable<int>>(300, ys1),
  791. OnNext<IObservable<int>>(400, ys2),
  792. OnNext<IObservable<int>>(500, ys3),
  793. OnCompleted<IObservable<int>>(600)
  794. );
  795. var res = scheduler.Start(() =>
  796. xs.Merge()
  797. );
  798. res.Messages.AssertEqual(
  799. OnNext(310, 101),
  800. OnNext(320, 102),
  801. OnNext(410, 103),
  802. OnNext(410, 201),
  803. OnNext(420, 104),
  804. OnNext(420, 202),
  805. OnNext(430, 203),
  806. OnNext(440, 204),
  807. OnError<int>(450, ex)
  808. );
  809. xs.Subscriptions.AssertEqual(
  810. Subscribe(200, 450)
  811. );
  812. ys1.Subscriptions.AssertEqual(
  813. Subscribe(300, 450)
  814. );
  815. ys2.Subscriptions.AssertEqual(
  816. Subscribe(400, 450)
  817. );
  818. ys3.Subscriptions.AssertEqual(
  819. );
  820. }
  821. [Fact]
  822. public void Merge_ObservableOfObservable_OuterThrows()
  823. {
  824. var scheduler = new TestScheduler();
  825. var ex = new Exception();
  826. var ys1 = scheduler.CreateColdObservable(
  827. OnNext(10, 101),
  828. OnNext(20, 102),
  829. OnNext(110, 103),
  830. OnNext(120, 104),
  831. OnNext(210, 105),
  832. OnNext(220, 106),
  833. OnCompleted<int>(230)
  834. );
  835. var ys2 = scheduler.CreateColdObservable(
  836. OnNext(10, 201),
  837. OnNext(20, 202),
  838. OnNext(30, 203),
  839. OnNext(40, 204),
  840. OnCompleted<int>(50)
  841. );
  842. var xs = scheduler.CreateHotObservable(
  843. OnNext<IObservable<int>>(300, ys1),
  844. OnNext<IObservable<int>>(400, ys2),
  845. OnError<IObservable<int>>(500, ex)
  846. );
  847. var res = scheduler.Start(() =>
  848. xs.Merge()
  849. );
  850. res.Messages.AssertEqual(
  851. OnNext(310, 101),
  852. OnNext(320, 102),
  853. OnNext(410, 103),
  854. OnNext(410, 201),
  855. OnNext(420, 104),
  856. OnNext(420, 202),
  857. OnNext(430, 203),
  858. OnNext(440, 204),
  859. OnError<int>(500, ex)
  860. );
  861. xs.Subscriptions.AssertEqual(
  862. Subscribe(200, 500)
  863. );
  864. ys1.Subscriptions.AssertEqual(
  865. Subscribe(300, 500)
  866. );
  867. ys2.Subscriptions.AssertEqual(
  868. Subscribe(400, 450)
  869. );
  870. }
  871. [Fact]
  872. public void Merge_Binary_DefaultScheduler()
  873. {
  874. Assert.True(Observable.Return(1).Merge(Observable.Return(2)).ToEnumerable().OrderBy(x => x).SequenceEqual(new[] { 1, 2 }));
  875. }
  876. [Fact]
  877. public void Merge_Params_DefaultScheduler()
  878. {
  879. Assert.True(Observable.Merge(Observable.Return(1), Observable.Return(2)).ToEnumerable().OrderBy(x => x).SequenceEqual(new[] { 1, 2 }));
  880. }
  881. [Fact]
  882. public void Merge_IEnumerableOfIObservable_DefaultScheduler()
  883. {
  884. Assert.True(Observable.Merge((IEnumerable<IObservable<int>>)new[] { Observable.Return(1), Observable.Return(2) }).ToEnumerable().OrderBy(x => x).SequenceEqual(new[] { 1, 2 }));
  885. }
  886. [Fact]
  887. public void MergeConcat_ArgumentChecking()
  888. {
  889. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IEnumerable<IObservable<int>>), 1, DummyScheduler.Instance));
  890. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Merge(DummyEnumerable<IObservable<int>>.Instance, 0, DummyScheduler.Instance));
  891. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(DummyEnumerable<IObservable<int>>.Instance, 1, null));
  892. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IEnumerable<IObservable<int>>), 1));
  893. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Merge(DummyEnumerable<IObservable<int>>.Instance, 0));
  894. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IObservable<IObservable<int>>), 1));
  895. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Merge(DummyObservable<IObservable<int>>.Instance, 0));
  896. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(default(IObservable<IObservable<int>>)));
  897. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(default(IObservable<Task<int>>)));
  898. }
  899. [Fact]
  900. public void MergeConcat_Enumerable_Scheduler()
  901. {
  902. var b = Enumerable.Range(1, 3).Select(x => Observable.Range(x * 10, 3)).Merge(1)
  903. .SequenceEqual(new[] { 10, 11, 12, 20, 21, 22, 30, 31, 32 }.ToObservable())
  904. .First();
  905. Assert.True(b);
  906. }
  907. [Fact]
  908. public void MergeConcat_Enumerable()
  909. {
  910. var b = Enumerable.Range(1, 3).Select(x => Observable.Range(x * 10, 3)).Merge(1, DefaultScheduler.Instance)
  911. .SequenceEqual(new[] { 10, 11, 12, 20, 21, 22, 30, 31, 32 }.ToObservable())
  912. .First();
  913. Assert.True(b);
  914. }
  915. [Fact]
  916. public void MergeConcat_Default()
  917. {
  918. var b = Observable.Range(1, 3).Select(x => Observable.Range(x * 10, 3)).Concat()
  919. .SequenceEqual(new[] { 10, 11, 12, 20, 21, 22, 30, 31, 32 }.ToObservable())
  920. .First();
  921. Assert.True(b);
  922. }
  923. [Fact]
  924. public void MergeConcat_Basic()
  925. {
  926. var scheduler = new TestScheduler();
  927. var ys1 = scheduler.CreateColdObservable(
  928. OnNext(50, 1),
  929. OnNext(100, 2),
  930. OnNext(120, 3),
  931. OnCompleted<int>(140)
  932. );
  933. var ys2 = scheduler.CreateColdObservable(
  934. OnNext(20, 4),
  935. OnNext(70, 5),
  936. OnCompleted<int>(200)
  937. );
  938. var ys3 = scheduler.CreateColdObservable(
  939. OnNext(10, 6),
  940. OnNext(90, 7),
  941. OnNext(110, 8),
  942. OnCompleted<int>(130)
  943. );
  944. var ys4 = scheduler.CreateColdObservable(
  945. OnNext(210, 9),
  946. OnNext(240, 10),
  947. OnCompleted<int>(300)
  948. );
  949. var xs = scheduler.CreateHotObservable(
  950. OnNext<IObservable<int>>(210, ys1),
  951. OnNext<IObservable<int>>(260, ys2),
  952. OnNext<IObservable<int>>(270, ys3),
  953. OnNext<IObservable<int>>(320, ys4),
  954. OnCompleted<IObservable<int>>(400)
  955. );
  956. var res = scheduler.Start(() =>
  957. xs.Merge(2)
  958. );
  959. res.Messages.AssertEqual(
  960. OnNext(260, 1),
  961. OnNext(280, 4),
  962. OnNext(310, 2),
  963. OnNext(330, 3),
  964. OnNext(330, 5),
  965. OnNext(360, 6),
  966. OnNext(440, 7),
  967. OnNext(460, 8),
  968. OnNext(670, 9),
  969. OnNext(700, 10),
  970. OnCompleted<int>(760)
  971. );
  972. #if !NO_PERF
  973. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  974. xs.Subscriptions.AssertEqual(
  975. Subscribe(200, 400)
  976. );
  977. #else
  978. xs.Subscriptions.AssertEqual(
  979. Subscribe(200, 760)
  980. );
  981. #endif
  982. ys1.Subscriptions.AssertEqual(
  983. Subscribe(210, 350)
  984. );
  985. ys2.Subscriptions.AssertEqual(
  986. Subscribe(260, 460)
  987. );
  988. ys3.Subscriptions.AssertEqual(
  989. Subscribe(350, 480)
  990. );
  991. ys4.Subscriptions.AssertEqual(
  992. Subscribe(460, 760)
  993. );
  994. }
  995. [Fact]
  996. public void MergeConcat_Basic_Long()
  997. {
  998. var scheduler = new TestScheduler();
  999. var ys1 = scheduler.CreateColdObservable(
  1000. OnNext(50, 1),
  1001. OnNext(100, 2),
  1002. OnNext(120, 3),
  1003. OnCompleted<int>(140)
  1004. );
  1005. var ys2 = scheduler.CreateColdObservable(
  1006. OnNext(20, 4),
  1007. OnNext(70, 5),
  1008. OnCompleted<int>(300)
  1009. );
  1010. var ys3 = scheduler.CreateColdObservable(
  1011. OnNext(10, 6),
  1012. OnNext(90, 7),
  1013. OnNext(110, 8),
  1014. OnCompleted<int>(130)
  1015. );
  1016. var ys4 = scheduler.CreateColdObservable(
  1017. OnNext(210, 9),
  1018. OnNext(240, 10),
  1019. OnCompleted<int>(300)
  1020. );
  1021. var xs = scheduler.CreateHotObservable(
  1022. OnNext<IObservable<int>>(210, ys1),
  1023. OnNext<IObservable<int>>(260, ys2),
  1024. OnNext<IObservable<int>>(270, ys3),
  1025. OnNext<IObservable<int>>(320, ys4),
  1026. OnCompleted<IObservable<int>>(400)
  1027. );
  1028. var res = scheduler.Start(() =>
  1029. xs.Merge(2)
  1030. );
  1031. res.Messages.AssertEqual(
  1032. OnNext(260, 1),
  1033. OnNext(280, 4),
  1034. OnNext(310, 2),
  1035. OnNext(330, 3),
  1036. OnNext(330, 5),
  1037. OnNext(360, 6),
  1038. OnNext(440, 7),
  1039. OnNext(460, 8),
  1040. OnNext(690, 9),
  1041. OnNext(720, 10),
  1042. OnCompleted<int>(780)
  1043. );
  1044. #if !NO_PERF
  1045. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  1046. xs.Subscriptions.AssertEqual(
  1047. Subscribe(200, 400)
  1048. );
  1049. #else
  1050. xs.Subscriptions.AssertEqual(
  1051. Subscribe(200, 780)
  1052. );
  1053. #endif
  1054. ys1.Subscriptions.AssertEqual(
  1055. Subscribe(210, 350)
  1056. );
  1057. ys2.Subscriptions.AssertEqual(
  1058. Subscribe(260, 560)
  1059. );
  1060. ys3.Subscriptions.AssertEqual(
  1061. Subscribe(350, 480)
  1062. );
  1063. ys4.Subscriptions.AssertEqual(
  1064. Subscribe(480, 780)
  1065. );
  1066. }
  1067. [Fact]
  1068. public void MergeConcat_Basic_Wide()
  1069. {
  1070. var scheduler = new TestScheduler();
  1071. var ys1 = scheduler.CreateColdObservable(
  1072. OnNext(50, 1),
  1073. OnNext(100, 2),
  1074. OnNext(120, 3),
  1075. OnCompleted<int>(140)
  1076. );
  1077. var ys2 = scheduler.CreateColdObservable(
  1078. OnNext(20, 4),
  1079. OnNext(70, 5),
  1080. OnCompleted<int>(300)
  1081. );
  1082. var ys3 = scheduler.CreateColdObservable(
  1083. OnNext(10, 6),
  1084. OnNext(90, 7),
  1085. OnNext(110, 8),
  1086. OnCompleted<int>(130)
  1087. );
  1088. var ys4 = scheduler.CreateColdObservable(
  1089. OnNext(210, 9),
  1090. OnNext(240, 10),
  1091. OnCompleted<int>(300)
  1092. );
  1093. var xs = scheduler.CreateHotObservable(
  1094. OnNext<IObservable<int>>(210, ys1),
  1095. OnNext<IObservable<int>>(260, ys2),
  1096. OnNext<IObservable<int>>(270, ys3),
  1097. OnNext<IObservable<int>>(420, ys4),
  1098. OnCompleted<IObservable<int>>(450)
  1099. );
  1100. var res = scheduler.Start(() =>
  1101. xs.Merge(3)
  1102. );
  1103. res.Messages.AssertEqual(
  1104. OnNext(260, 1),
  1105. OnNext(280, 4),
  1106. OnNext(280, 6),
  1107. OnNext(310, 2),
  1108. OnNext(330, 3),
  1109. OnNext(330, 5),
  1110. OnNext(360, 7),
  1111. OnNext(380, 8),
  1112. OnNext(630, 9),
  1113. OnNext(660, 10),
  1114. OnCompleted<int>(720)
  1115. );
  1116. #if !NO_PERF
  1117. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  1118. xs.Subscriptions.AssertEqual(
  1119. Subscribe(200, 450)
  1120. );
  1121. #else
  1122. xs.Subscriptions.AssertEqual(
  1123. Subscribe(200, 720)
  1124. );
  1125. #endif
  1126. ys1.Subscriptions.AssertEqual(
  1127. Subscribe(210, 350)
  1128. );
  1129. ys2.Subscriptions.AssertEqual(
  1130. Subscribe(260, 560)
  1131. );
  1132. ys3.Subscriptions.AssertEqual(
  1133. Subscribe(270, 400)
  1134. );
  1135. ys4.Subscriptions.AssertEqual(
  1136. Subscribe(420, 720)
  1137. );
  1138. }
  1139. [Fact]
  1140. public void MergeConcat_Basic_Late()
  1141. {
  1142. var scheduler = new TestScheduler();
  1143. var ys1 = scheduler.CreateColdObservable(
  1144. OnNext(50, 1),
  1145. OnNext(100, 2),
  1146. OnNext(120, 3),
  1147. OnCompleted<int>(140)
  1148. );
  1149. var ys2 = scheduler.CreateColdObservable(
  1150. OnNext(20, 4),
  1151. OnNext(70, 5),
  1152. OnCompleted<int>(300)
  1153. );
  1154. var ys3 = scheduler.CreateColdObservable(
  1155. OnNext(10, 6),
  1156. OnNext(90, 7),
  1157. OnNext(110, 8),
  1158. OnCompleted<int>(130)
  1159. );
  1160. var ys4 = scheduler.CreateColdObservable(
  1161. OnNext(210, 9),
  1162. OnNext(240, 10),
  1163. OnCompleted<int>(300)
  1164. );
  1165. var xs = scheduler.CreateHotObservable(
  1166. OnNext<IObservable<int>>(210, ys1),
  1167. OnNext<IObservable<int>>(260, ys2),
  1168. OnNext<IObservable<int>>(270, ys3),
  1169. OnNext<IObservable<int>>(420, ys4),
  1170. OnCompleted<IObservable<int>>(750)
  1171. );
  1172. var res = scheduler.Start(() =>
  1173. xs.Merge(3)
  1174. );
  1175. res.Messages.AssertEqual(
  1176. OnNext(260, 1),
  1177. OnNext(280, 4),
  1178. OnNext(280, 6),
  1179. OnNext(310, 2),
  1180. OnNext(330, 3),
  1181. OnNext(330, 5),
  1182. OnNext(360, 7),
  1183. OnNext(380, 8),
  1184. OnNext(630, 9),
  1185. OnNext(660, 10),
  1186. OnCompleted<int>(750)
  1187. );
  1188. xs.Subscriptions.AssertEqual(
  1189. Subscribe(200, 750)
  1190. );
  1191. ys1.Subscriptions.AssertEqual(
  1192. Subscribe(210, 350)
  1193. );
  1194. ys2.Subscriptions.AssertEqual(
  1195. Subscribe(260, 560)
  1196. );
  1197. ys3.Subscriptions.AssertEqual(
  1198. Subscribe(270, 400)
  1199. );
  1200. ys4.Subscriptions.AssertEqual(
  1201. Subscribe(420, 720)
  1202. );
  1203. }
  1204. [Fact]
  1205. public void MergeConcat_Disposed()
  1206. {
  1207. var scheduler = new TestScheduler();
  1208. var ys1 = scheduler.CreateColdObservable(
  1209. OnNext(50, 1),
  1210. OnNext(100, 2),
  1211. OnNext(120, 3),
  1212. OnCompleted<int>(140)
  1213. );
  1214. var ys2 = scheduler.CreateColdObservable(
  1215. OnNext(20, 4),
  1216. OnNext(70, 5),
  1217. OnCompleted<int>(200)
  1218. );
  1219. var ys3 = scheduler.CreateColdObservable(
  1220. OnNext(10, 6),
  1221. OnNext(90, 7),
  1222. OnNext(110, 8),
  1223. OnCompleted<int>(130)
  1224. );
  1225. var ys4 = scheduler.CreateColdObservable(
  1226. OnNext(210, 9),
  1227. OnNext(240, 10),
  1228. OnCompleted<int>(300)
  1229. );
  1230. var xs = scheduler.CreateHotObservable(
  1231. OnNext<IObservable<int>>(210, ys1),
  1232. OnNext<IObservable<int>>(260, ys2),
  1233. OnNext<IObservable<int>>(270, ys3),
  1234. OnNext<IObservable<int>>(320, ys4),
  1235. OnCompleted<IObservable<int>>(400)
  1236. );
  1237. var res = scheduler.Start(() =>
  1238. xs.Merge(2),
  1239. 450
  1240. );
  1241. res.Messages.AssertEqual(
  1242. OnNext(260, 1),
  1243. OnNext(280, 4),
  1244. OnNext(310, 2),
  1245. OnNext(330, 3),
  1246. OnNext(330, 5),
  1247. OnNext(360, 6),
  1248. OnNext(440, 7)
  1249. );
  1250. #if !NO_PERF
  1251. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  1252. xs.Subscriptions.AssertEqual(
  1253. Subscribe(200, 400)
  1254. );
  1255. #else
  1256. xs.Subscriptions.AssertEqual(
  1257. Subscribe(200, 450)
  1258. );
  1259. #endif
  1260. ys1.Subscriptions.AssertEqual(
  1261. Subscribe(210, 350)
  1262. );
  1263. ys2.Subscriptions.AssertEqual(
  1264. Subscribe(260, 450)
  1265. );
  1266. ys3.Subscriptions.AssertEqual(
  1267. Subscribe(350, 450)
  1268. );
  1269. ys4.Subscriptions.AssertEqual(
  1270. );
  1271. }
  1272. [Fact]
  1273. public void MergeConcat_OuterError()
  1274. {
  1275. var scheduler = new TestScheduler();
  1276. var ex = new Exception();
  1277. var ys1 = scheduler.CreateColdObservable(
  1278. OnNext(50, 1),
  1279. OnNext(100, 2),
  1280. OnNext(120, 3),
  1281. OnCompleted<int>(140)
  1282. );
  1283. var ys2 = scheduler.CreateColdObservable(
  1284. OnNext(20, 4),
  1285. OnNext(70, 5),
  1286. OnCompleted<int>(200)
  1287. );
  1288. var ys3 = scheduler.CreateColdObservable(
  1289. OnNext(10, 6),
  1290. OnNext(90, 7),
  1291. OnNext(110, 8),
  1292. OnCompleted<int>(130)
  1293. );
  1294. var ys4 = scheduler.CreateColdObservable(
  1295. OnNext(210, 9),
  1296. OnNext(240, 10),
  1297. OnCompleted<int>(300)
  1298. );
  1299. var xs = scheduler.CreateHotObservable(
  1300. OnNext<IObservable<int>>(210, ys1),
  1301. OnNext<IObservable<int>>(260, ys2),
  1302. OnNext<IObservable<int>>(270, ys3),
  1303. OnNext<IObservable<int>>(320, ys4),
  1304. OnError<IObservable<int>>(400, ex)
  1305. );
  1306. var res = scheduler.Start(() =>
  1307. xs.Merge(2)
  1308. );
  1309. res.Messages.AssertEqual(
  1310. OnNext(260, 1),
  1311. OnNext(280, 4),
  1312. OnNext(310, 2),
  1313. OnNext(330, 3),
  1314. OnNext(330, 5),
  1315. OnNext(360, 6),
  1316. OnError<int>(400, ex)
  1317. );
  1318. xs.Subscriptions.AssertEqual(
  1319. Subscribe(200, 400)
  1320. );
  1321. ys1.Subscriptions.AssertEqual(
  1322. Subscribe(210, 350)
  1323. );
  1324. ys2.Subscriptions.AssertEqual(
  1325. Subscribe(260, 400)
  1326. );
  1327. ys3.Subscriptions.AssertEqual(
  1328. Subscribe(350, 400)
  1329. );
  1330. ys4.Subscriptions.AssertEqual(
  1331. );
  1332. }
  1333. [Fact]
  1334. public void MergeConcat_InnerError()
  1335. {
  1336. var scheduler = new TestScheduler();
  1337. var ex = new Exception();
  1338. var ys1 = scheduler.CreateColdObservable(
  1339. OnNext(50, 1),
  1340. OnNext(100, 2),
  1341. OnNext(120, 3),
  1342. OnCompleted<int>(140)
  1343. );
  1344. var ys2 = scheduler.CreateColdObservable(
  1345. OnNext(20, 4),
  1346. OnNext(70, 5),
  1347. OnCompleted<int>(200)
  1348. );
  1349. var ys3 = scheduler.CreateColdObservable(
  1350. OnNext(10, 6),
  1351. OnNext(90, 7),
  1352. OnNext(110, 8),
  1353. OnError<int>(140, ex)
  1354. );
  1355. var ys4 = scheduler.CreateColdObservable(
  1356. OnNext(210, 9),
  1357. OnNext(240, 10),
  1358. OnCompleted<int>(300)
  1359. );
  1360. var xs = scheduler.CreateHotObservable(
  1361. OnNext<IObservable<int>>(210, ys1),
  1362. OnNext<IObservable<int>>(260, ys2),
  1363. OnNext<IObservable<int>>(270, ys3),
  1364. OnNext<IObservable<int>>(320, ys4),
  1365. OnCompleted<IObservable<int>>(400)
  1366. );
  1367. var res = scheduler.Start(() =>
  1368. xs.Merge(2)
  1369. );
  1370. res.Messages.AssertEqual(
  1371. OnNext(260, 1),
  1372. OnNext(280, 4),
  1373. OnNext(310, 2),
  1374. OnNext(330, 3),
  1375. OnNext(330, 5),
  1376. OnNext(360, 6),
  1377. OnNext(440, 7),
  1378. OnNext(460, 8),
  1379. OnError<int>(490, ex)
  1380. );
  1381. #if !NO_PERF
  1382. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  1383. xs.Subscriptions.AssertEqual(
  1384. Subscribe(200, 400)
  1385. );
  1386. #else
  1387. xs.Subscriptions.AssertEqual(
  1388. Subscribe(200, 490)
  1389. );
  1390. #endif
  1391. ys1.Subscriptions.AssertEqual(
  1392. Subscribe(210, 350)
  1393. );
  1394. ys2.Subscriptions.AssertEqual(
  1395. Subscribe(260, 460)
  1396. );
  1397. ys3.Subscriptions.AssertEqual(
  1398. Subscribe(350, 490)
  1399. );
  1400. ys4.Subscriptions.AssertEqual(
  1401. Subscribe(460, 490)
  1402. );
  1403. }
  1404. [Fact]
  1405. public void Merge_Task()
  1406. {
  1407. var tss = Observable.Merge(new[] { Task.Factory.StartNew(() => 1), Task.Factory.StartNew(() => 2), Task.Factory.StartNew(() => 3) }.ToObservable());
  1408. var res = tss.ToArray().Single();
  1409. Assert.True(res.OrderBy(x => x).SequenceEqual(new[] { 1, 2, 3 }));
  1410. }
  1411. [Fact]
  1412. public void Merge_TaskWithCompletionSource_RanToCompletion_Async()
  1413. {
  1414. var tcss = new TaskCompletionSource<int>[2];
  1415. tcss[0] = new TaskCompletionSource<int>();
  1416. tcss[1] = new TaskCompletionSource<int>();
  1417. var res = Observable.Merge(Observable.Range(0, 2).Select(x => tcss[x].Task));
  1418. var lst = new List<int>();
  1419. var done = new ManualResetEvent(false);
  1420. res.Subscribe(lst.Add, () => done.Set());
  1421. tcss[0].SetResult(42);
  1422. tcss[1].SetResult(43);
  1423. done.WaitOne();
  1424. lst.OrderBy(x => x).AssertEqual(new[] { 42, 43 });
  1425. }
  1426. [Fact]
  1427. public void Merge_TaskWithCompletionSource_RanToCompletion_Sync()
  1428. {
  1429. var tcss = new TaskCompletionSource<int>[2];
  1430. tcss[0] = new TaskCompletionSource<int>();
  1431. tcss[1] = new TaskCompletionSource<int>();
  1432. tcss[0].SetResult(42);
  1433. tcss[1].SetResult(43);
  1434. var res = Observable.Merge(Observable.Range(0, 2).Select(x => tcss[x].Task));
  1435. var lst = new List<int>();
  1436. var done = new ManualResetEvent(false);
  1437. res.Subscribe(lst.Add, () => done.Set());
  1438. done.WaitOne();
  1439. lst.OrderBy(x => x).AssertEqual(new[] { 42, 43 });
  1440. }
  1441. [Fact]
  1442. public void Merge_TaskWithCompletionSource_Faulted_Async()
  1443. {
  1444. var tcss = new TaskCompletionSource<int>[3];
  1445. tcss[0] = new TaskCompletionSource<int>();
  1446. tcss[1] = new TaskCompletionSource<int>();
  1447. tcss[2] = new TaskCompletionSource<int>();
  1448. var res = Observable.Merge(Observable.Range(0, 3).Select(x => tcss[x].Task));
  1449. var lst = new List<int>();
  1450. var err = default(Exception);
  1451. var done = new ManualResetEvent(false);
  1452. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1453. var ex = new Exception();
  1454. tcss[1].SetException(ex);
  1455. done.WaitOne();
  1456. lst.AssertEqual(new int[0]);
  1457. Assert.Same(ex, err);
  1458. }
  1459. [Fact]
  1460. public void Merge_TaskWithCompletionSource_Faulted_Sync()
  1461. {
  1462. var tcss = new TaskCompletionSource<int>[3];
  1463. tcss[0] = new TaskCompletionSource<int>();
  1464. tcss[1] = new TaskCompletionSource<int>();
  1465. tcss[2] = new TaskCompletionSource<int>();
  1466. var ex = new Exception();
  1467. tcss[1].SetException(ex);
  1468. var res = Observable.Merge(Observable.Range(0, 3).Select(x => tcss[x].Task));
  1469. var lst = new List<int>();
  1470. var err = default(Exception);
  1471. var done = new ManualResetEvent(false);
  1472. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1473. done.WaitOne();
  1474. lst.AssertEqual(new int[0]);
  1475. Assert.Same(ex, err);
  1476. }
  1477. [Fact]
  1478. public void Merge_TaskWithCompletionSource_Canceled_Async()
  1479. {
  1480. var tcss = new TaskCompletionSource<int>[3];
  1481. tcss[0] = new TaskCompletionSource<int>();
  1482. tcss[1] = new TaskCompletionSource<int>();
  1483. tcss[2] = new TaskCompletionSource<int>();
  1484. var res = Observable.Merge(Observable.Range(0, 3).Select(x => tcss[x].Task));
  1485. var lst = new List<int>();
  1486. var err = default(Exception);
  1487. var done = new ManualResetEvent(false);
  1488. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1489. tcss[1].SetCanceled();
  1490. done.WaitOne();
  1491. lst.AssertEqual(new int[0]);
  1492. Assert.True(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
  1493. }
  1494. [Fact]
  1495. public void Merge_TaskWithCompletionSource_Canceled_Sync()
  1496. {
  1497. var tcss = new TaskCompletionSource<int>[3];
  1498. tcss[0] = new TaskCompletionSource<int>();
  1499. tcss[1] = new TaskCompletionSource<int>();
  1500. tcss[2] = new TaskCompletionSource<int>();
  1501. tcss[1].SetCanceled();
  1502. var res = Observable.Merge(Observable.Range(0, 3).Select(x => tcss[x].Task));
  1503. var lst = new List<int>();
  1504. var err = default(Exception);
  1505. var done = new ManualResetEvent(false);
  1506. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1507. done.WaitOne();
  1508. lst.AssertEqual(new int[0]);
  1509. Assert.True(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
  1510. }
  1511. [Fact]
  1512. public void Merge_TaskWithCompletionSource_InnerCompleteBeforeOuter()
  1513. {
  1514. var xs = new Subject<int>();
  1515. var tcss = new TaskCompletionSource<int>[3];
  1516. tcss[0] = new TaskCompletionSource<int>();
  1517. tcss[1] = new TaskCompletionSource<int>();
  1518. tcss[2] = new TaskCompletionSource<int>();
  1519. var res = Observable.Merge(xs.Select(x => tcss[x].Task));
  1520. var lst = new List<int>();
  1521. var done = new ManualResetEvent(false);
  1522. res.Subscribe(lst.Add, () => done.Set());
  1523. tcss[1].SetResult(42);
  1524. xs.OnNext(0);
  1525. xs.OnNext(1);
  1526. xs.OnNext(2);
  1527. tcss[0].SetResult(43);
  1528. tcss[2].SetResult(44);
  1529. xs.OnCompleted();
  1530. done.WaitOne();
  1531. lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 });
  1532. }
  1533. [Fact]
  1534. public void Merge_TaskWithCompletionSource_OuterCompleteBeforeInner()
  1535. {
  1536. var xs = new Subject<int>();
  1537. var tcss = new TaskCompletionSource<int>[3];
  1538. tcss[0] = new TaskCompletionSource<int>();
  1539. tcss[1] = new TaskCompletionSource<int>();
  1540. tcss[2] = new TaskCompletionSource<int>();
  1541. var res = Observable.Merge(xs.Select(x => tcss[x].Task));
  1542. var lst = new List<int>();
  1543. var done = new ManualResetEvent(false);
  1544. res.Subscribe(lst.Add, () => done.Set());
  1545. tcss[1].SetResult(42);
  1546. xs.OnNext(0);
  1547. xs.OnNext(1);
  1548. xs.OnNext(2);
  1549. xs.OnCompleted();
  1550. tcss[0].SetResult(43);
  1551. tcss[2].SetResult(44);
  1552. done.WaitOne();
  1553. lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 });
  1554. }
  1555. [Fact]
  1556. public void Merge_Task_OnError()
  1557. {
  1558. var xs = new Subject<int>();
  1559. var tcss = new TaskCompletionSource<int>[3];
  1560. tcss[0] = new TaskCompletionSource<int>();
  1561. tcss[1] = new TaskCompletionSource<int>();
  1562. tcss[2] = new TaskCompletionSource<int>();
  1563. var res = Observable.Merge(xs.Select(x => tcss[x].Task));
  1564. var lst = new List<int>();
  1565. var err = default(Exception);
  1566. var done = new ManualResetEvent(false);
  1567. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1568. tcss[1].SetResult(42);
  1569. xs.OnNext(0);
  1570. xs.OnNext(1);
  1571. xs.OnNext(2);
  1572. tcss[0].SetResult(43);
  1573. tcss[2].SetResult(44);
  1574. var ex = new Exception();
  1575. xs.OnError(ex);
  1576. done.WaitOne();
  1577. Assert.Same(ex, err);
  1578. }
  1579. }
  1580. }