MergeTest.cs 56 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using Microsoft.Reactive.Testing;
  14. using ReactiveTests.Dummies;
  15. using Microsoft.VisualStudio.TestTools.UnitTesting;
  16. using Assert = Xunit.Assert;
  17. namespace ReactiveTests.Tests
  18. {
  19. [TestClass]
  20. public class MergeTest : ReactiveTest
  21. {
  22. [TestMethod]
  23. public void Merge_ArgumentChecking()
  24. {
  25. var xs = DummyObservable<int>.Instance;
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IScheduler), xs, xs));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(xs, xs, default(IScheduler)));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(xs, null));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IObservable<int>), xs));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IObservable<int>[])null));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IEnumerable<IObservable<int>>)null));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).Merge(xs, DummyScheduler.Instance));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => xs.Merge(default, DummyScheduler.Instance));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IEnumerable<IObservable<int>>)null, DummyScheduler.Instance));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(new IObservable<int>[0], default(IScheduler)));
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IObservable<IObservable<int>>)null));
  37. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(DummyScheduler.Instance, (IObservable<int>[])null));
  38. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge((IObservable<Task<int>>)null));
  39. }
  40. [TestMethod]
  41. public void Merge_DefaultScheduler()
  42. {
  43. var xs = Observable.Merge(Observable.Return(42), Observable.Return(43), Observable.Return(44));
  44. var res = xs.ToList().Single();
  45. Assert.True(new[] { 42, 43, 44 }.SequenceEqual(res));
  46. }
  47. [TestMethod]
  48. public void Merge_Never2()
  49. {
  50. var scheduler = new TestScheduler();
  51. var n1 = scheduler.CreateHotObservable(
  52. OnNext(150, 1)
  53. );
  54. var n2 = scheduler.CreateHotObservable(
  55. OnNext(150, 1)
  56. );
  57. var res = scheduler.Start(() =>
  58. Observable.Merge(scheduler, n1, n2)
  59. );
  60. res.Messages.AssertEqual(
  61. );
  62. n1.Subscriptions.AssertEqual(
  63. Subscribe(201, 1000)
  64. );
  65. n2.Subscriptions.AssertEqual(
  66. Subscribe(202, 1000)
  67. );
  68. }
  69. [TestMethod]
  70. public void Merge_Never3()
  71. {
  72. var scheduler = new TestScheduler();
  73. var n1 = scheduler.CreateHotObservable(
  74. OnNext(150, 1)
  75. );
  76. var n2 = scheduler.CreateHotObservable(
  77. OnNext(150, 1)
  78. );
  79. var n3 = scheduler.CreateHotObservable(
  80. OnNext(150, 1)
  81. );
  82. var res = scheduler.Start(() =>
  83. Observable.Merge(scheduler, n1, n2, n3)
  84. );
  85. res.Messages.AssertEqual(
  86. );
  87. n1.Subscriptions.AssertEqual(
  88. Subscribe(201, 1000)
  89. );
  90. n2.Subscriptions.AssertEqual(
  91. Subscribe(202, 1000)
  92. );
  93. n3.Subscriptions.AssertEqual(
  94. Subscribe(203, 1000)
  95. );
  96. }
  97. [TestMethod]
  98. public void Merge_Empty2()
  99. {
  100. var scheduler = new TestScheduler();
  101. var e1 = scheduler.CreateHotObservable(
  102. OnNext(150, 1),
  103. OnCompleted<int>(210)
  104. );
  105. var e2 = scheduler.CreateHotObservable(
  106. OnNext(150, 1),
  107. OnCompleted<int>(230)
  108. );
  109. var res = scheduler.Start(() =>
  110. Observable.Merge(scheduler, e1, e2)
  111. );
  112. res.Messages.AssertEqual(
  113. OnCompleted<int>(230)
  114. );
  115. e1.Subscriptions.AssertEqual(
  116. Subscribe(201, 210)
  117. );
  118. e2.Subscriptions.AssertEqual(
  119. Subscribe(202, 230)
  120. );
  121. }
  122. [TestMethod]
  123. public void Merge_Empty3()
  124. {
  125. var scheduler = new TestScheduler();
  126. var e1 = scheduler.CreateHotObservable(
  127. OnNext(150, 1),
  128. OnCompleted<int>(210)
  129. );
  130. var e2 = scheduler.CreateHotObservable(
  131. OnNext(150, 1),
  132. OnCompleted<int>(230)
  133. );
  134. var e3 = scheduler.CreateHotObservable(
  135. OnNext(150, 1),
  136. OnCompleted<int>(240)
  137. );
  138. var res = scheduler.Start(() =>
  139. Observable.Merge(scheduler, e1, e2, e3)
  140. );
  141. res.Messages.AssertEqual(
  142. OnCompleted<int>(240)
  143. );
  144. e1.Subscriptions.AssertEqual(
  145. Subscribe(201, 210)
  146. );
  147. e2.Subscriptions.AssertEqual(
  148. Subscribe(202, 230)
  149. );
  150. e3.Subscriptions.AssertEqual(
  151. Subscribe(203, 240)
  152. );
  153. }
  154. [TestMethod]
  155. public void Merge_EmptyDelayed2_RightLast()
  156. {
  157. var scheduler = new TestScheduler();
  158. var e1 = scheduler.CreateHotObservable(
  159. OnNext(150, 1),
  160. OnCompleted<int>(240)
  161. );
  162. var e2 = scheduler.CreateHotObservable(
  163. OnNext(150, 1),
  164. OnCompleted<int>(250)
  165. );
  166. var res = scheduler.Start(() =>
  167. Observable.Merge(scheduler, e1, e2)
  168. );
  169. res.Messages.AssertEqual(
  170. OnCompleted<int>(250)
  171. );
  172. e1.Subscriptions.AssertEqual(
  173. Subscribe(201, 240)
  174. );
  175. e2.Subscriptions.AssertEqual(
  176. Subscribe(202, 250)
  177. );
  178. }
  179. [TestMethod]
  180. public void Merge_EmptyDelayed2_LeftLast()
  181. {
  182. var scheduler = new TestScheduler();
  183. var e1 = scheduler.CreateHotObservable(
  184. OnNext(150, 1),
  185. OnCompleted<int>(250)
  186. );
  187. var e2 = scheduler.CreateHotObservable(
  188. OnNext(150, 1),
  189. OnCompleted<int>(240)
  190. );
  191. var res = scheduler.Start(() =>
  192. Observable.Merge(scheduler, e1, e2)
  193. );
  194. res.Messages.AssertEqual(
  195. OnCompleted<int>(250)
  196. );
  197. e1.Subscriptions.AssertEqual(
  198. Subscribe(201, 250)
  199. );
  200. e2.Subscriptions.AssertEqual(
  201. Subscribe(202, 240)
  202. );
  203. }
  204. [TestMethod]
  205. public void Merge_EmptyDelayed3_MiddleLast()
  206. {
  207. var scheduler = new TestScheduler();
  208. var e1 = scheduler.CreateHotObservable(
  209. OnNext(150, 1),
  210. OnCompleted<int>(245)
  211. );
  212. var e2 = scheduler.CreateHotObservable(
  213. OnNext(150, 1),
  214. OnCompleted<int>(250)
  215. );
  216. var e3 = scheduler.CreateHotObservable(
  217. OnNext(150, 1),
  218. OnCompleted<int>(240)
  219. );
  220. var res = scheduler.Start(() =>
  221. Observable.Merge(scheduler, e1, e2, e3)
  222. );
  223. res.Messages.AssertEqual(
  224. OnCompleted<int>(250)
  225. );
  226. e1.Subscriptions.AssertEqual(
  227. Subscribe(201, 245)
  228. );
  229. e2.Subscriptions.AssertEqual(
  230. Subscribe(202, 250)
  231. );
  232. e3.Subscriptions.AssertEqual(
  233. Subscribe(203, 240)
  234. );
  235. }
  236. [TestMethod]
  237. public void Merge_EmptyNever()
  238. {
  239. var scheduler = new TestScheduler();
  240. var e1 = scheduler.CreateHotObservable(
  241. OnNext(150, 1),
  242. OnCompleted<int>(245)
  243. );
  244. var n1 = scheduler.CreateHotObservable(
  245. OnNext(150, 1)
  246. );
  247. var res = scheduler.Start(() =>
  248. Observable.Merge(scheduler, e1, n1)
  249. );
  250. res.Messages.AssertEqual(
  251. );
  252. e1.Subscriptions.AssertEqual(
  253. Subscribe(201, 245)
  254. );
  255. n1.Subscriptions.AssertEqual(
  256. Subscribe(202, 1000)
  257. );
  258. }
  259. [TestMethod]
  260. public void Merge_NeverEmpty()
  261. {
  262. var scheduler = new TestScheduler();
  263. var n1 = scheduler.CreateHotObservable(
  264. OnNext(150, 1)
  265. );
  266. var e1 = scheduler.CreateHotObservable(
  267. OnNext(150, 1),
  268. OnCompleted<int>(245)
  269. );
  270. var res = scheduler.Start(() =>
  271. Observable.Merge(scheduler, n1, e1)
  272. );
  273. res.Messages.AssertEqual(
  274. );
  275. n1.Subscriptions.AssertEqual(
  276. Subscribe(201, 1000)
  277. );
  278. e1.Subscriptions.AssertEqual(
  279. Subscribe(202, 245)
  280. );
  281. }
  282. [TestMethod]
  283. public void Merge_ReturnNever()
  284. {
  285. var scheduler = new TestScheduler();
  286. var r1 = scheduler.CreateHotObservable(
  287. OnNext(150, 1),
  288. OnNext(210, 2),
  289. OnCompleted<int>(245)
  290. );
  291. var n1 = scheduler.CreateHotObservable(
  292. OnNext(150, 1)
  293. );
  294. var res = scheduler.Start(() =>
  295. Observable.Merge(scheduler, r1, n1)
  296. );
  297. res.Messages.AssertEqual(
  298. OnNext(210, 2)
  299. );
  300. r1.Subscriptions.AssertEqual(
  301. Subscribe(201, 245)
  302. );
  303. n1.Subscriptions.AssertEqual(
  304. Subscribe(202, 1000)
  305. );
  306. }
  307. [TestMethod]
  308. public void Merge_NeverReturn()
  309. {
  310. var scheduler = new TestScheduler();
  311. var n1 = scheduler.CreateHotObservable(
  312. OnNext(150, 1)
  313. );
  314. var r1 = scheduler.CreateHotObservable(
  315. OnNext(150, 1),
  316. OnNext(210, 2),
  317. OnCompleted<int>(245)
  318. );
  319. var res = scheduler.Start(() =>
  320. Observable.Merge(scheduler, n1, r1)
  321. );
  322. res.Messages.AssertEqual(
  323. OnNext(210, 2)
  324. );
  325. n1.Subscriptions.AssertEqual(
  326. Subscribe(201, 1000)
  327. );
  328. r1.Subscriptions.AssertEqual(
  329. Subscribe(202, 245)
  330. );
  331. }
  332. [TestMethod]
  333. public void Merge_ErrorNever()
  334. {
  335. var scheduler = new TestScheduler();
  336. var ex = new Exception();
  337. var e1 = scheduler.CreateHotObservable(
  338. OnNext(150, 1),
  339. OnNext(210, 2),
  340. OnError<int>(245, ex)
  341. );
  342. var n1 = scheduler.CreateHotObservable(
  343. OnNext(150, 1)
  344. );
  345. var res = scheduler.Start(() =>
  346. Observable.Merge(scheduler, e1, n1)
  347. );
  348. res.Messages.AssertEqual(
  349. OnNext(210, 2),
  350. OnError<int>(245, ex)
  351. );
  352. e1.Subscriptions.AssertEqual(
  353. Subscribe(201, 245)
  354. );
  355. n1.Subscriptions.AssertEqual(
  356. Subscribe(202, 245)
  357. );
  358. }
  359. [TestMethod]
  360. public void Merge_NeverError()
  361. {
  362. var scheduler = new TestScheduler();
  363. var ex = new Exception();
  364. var n1 = scheduler.CreateHotObservable(
  365. OnNext(150, 1)
  366. );
  367. var e1 = scheduler.CreateHotObservable(
  368. OnNext(150, 1),
  369. OnNext(210, 2),
  370. OnError<int>(245, ex)
  371. );
  372. var res = scheduler.Start(() =>
  373. Observable.Merge(scheduler, n1, e1)
  374. );
  375. res.Messages.AssertEqual(
  376. OnNext(210, 2),
  377. OnError<int>(245, ex)
  378. );
  379. n1.Subscriptions.AssertEqual(
  380. Subscribe(201, 245)
  381. );
  382. e1.Subscriptions.AssertEqual(
  383. Subscribe(202, 245)
  384. );
  385. }
  386. [TestMethod]
  387. public void Merge_EmptyReturn()
  388. {
  389. var scheduler = new TestScheduler();
  390. var e1 = scheduler.CreateHotObservable(
  391. OnNext(150, 1),
  392. OnCompleted<int>(245)
  393. );
  394. var r1 = scheduler.CreateHotObservable(
  395. OnNext(150, 1),
  396. OnNext(210, 2),
  397. OnCompleted<int>(250)
  398. );
  399. var res = scheduler.Start(() =>
  400. Observable.Merge(scheduler, e1, r1)
  401. );
  402. res.Messages.AssertEqual(
  403. OnNext(210, 2),
  404. OnCompleted<int>(250)
  405. );
  406. e1.Subscriptions.AssertEqual(
  407. Subscribe(201, 245)
  408. );
  409. r1.Subscriptions.AssertEqual(
  410. Subscribe(202, 250)
  411. );
  412. }
  413. [TestMethod]
  414. public void Merge_ReturnEmpty()
  415. {
  416. var scheduler = new TestScheduler();
  417. var r1 = scheduler.CreateHotObservable(
  418. OnNext(150, 1),
  419. OnNext(210, 2),
  420. OnCompleted<int>(250)
  421. );
  422. var e1 = scheduler.CreateHotObservable(
  423. OnNext(150, 1),
  424. OnCompleted<int>(245)
  425. );
  426. var res = scheduler.Start(() =>
  427. Observable.Merge(scheduler, r1, e1)
  428. );
  429. res.Messages.AssertEqual(
  430. OnNext(210, 2),
  431. OnCompleted<int>(250)
  432. );
  433. r1.Subscriptions.AssertEqual(
  434. Subscribe(201, 250)
  435. );
  436. e1.Subscriptions.AssertEqual(
  437. Subscribe(202, 245)
  438. );
  439. }
  440. [TestMethod]
  441. public void Merge_Lots2()
  442. {
  443. var scheduler = new TestScheduler();
  444. var o1 = scheduler.CreateHotObservable(
  445. OnNext(150, 1),
  446. OnNext(210, 2),
  447. OnNext(220, 4),
  448. OnNext(230, 6),
  449. OnNext(240, 8),
  450. OnCompleted<int>(245)
  451. );
  452. var o2 = scheduler.CreateHotObservable(
  453. OnNext(150, 1),
  454. OnNext(215, 3),
  455. OnNext(225, 5),
  456. OnNext(235, 7),
  457. OnNext(245, 9),
  458. OnCompleted<int>(250)
  459. );
  460. var res = scheduler.Start(() =>
  461. Observable.Merge(scheduler, o1, o2)
  462. );
  463. res.Messages.AssertEqual(
  464. OnNext(210, 2),
  465. OnNext(215, 3),
  466. OnNext(220, 4),
  467. OnNext(225, 5),
  468. OnNext(230, 6),
  469. OnNext(235, 7),
  470. OnNext(240, 8),
  471. OnNext(245, 9),
  472. OnCompleted<int>(250)
  473. );
  474. o1.Subscriptions.AssertEqual(
  475. Subscribe(201, 245)
  476. );
  477. o2.Subscriptions.AssertEqual(
  478. Subscribe(202, 250)
  479. );
  480. }
  481. [TestMethod]
  482. public void Merge_Lots3()
  483. {
  484. var scheduler = new TestScheduler();
  485. var o1 = scheduler.CreateHotObservable(
  486. OnNext(150, 1),
  487. OnNext(210, 2),
  488. OnNext(225, 5),
  489. OnNext(240, 8),
  490. OnCompleted<int>(245)
  491. );
  492. var o2 = scheduler.CreateHotObservable(
  493. OnNext(150, 1),
  494. OnNext(215, 3),
  495. OnNext(230, 6),
  496. OnNext(245, 9),
  497. OnCompleted<int>(250)
  498. );
  499. var o3 = scheduler.CreateHotObservable(
  500. OnNext(150, 1),
  501. OnNext(220, 4),
  502. OnNext(235, 7),
  503. OnCompleted<int>(240)
  504. );
  505. var res = scheduler.Start(() =>
  506. new[] { o1, o2, o3 }.Merge(scheduler)
  507. );
  508. res.Messages.AssertEqual(
  509. OnNext(210, 2),
  510. OnNext(215, 3),
  511. OnNext(220, 4),
  512. OnNext(225, 5),
  513. OnNext(230, 6),
  514. OnNext(235, 7),
  515. OnNext(240, 8),
  516. OnNext(245, 9),
  517. OnCompleted<int>(250)
  518. );
  519. o1.Subscriptions.AssertEqual(
  520. Subscribe(201, 245)
  521. );
  522. o2.Subscriptions.AssertEqual(
  523. Subscribe(202, 250)
  524. );
  525. o3.Subscriptions.AssertEqual(
  526. Subscribe(203, 240)
  527. );
  528. }
  529. [TestMethod]
  530. public void Merge_LotsMore()
  531. {
  532. var inputs = new List<List<Recorded<Notification<int>>>>();
  533. const int N = 10;
  534. for (var i = 0; i < N; i++)
  535. {
  536. var lst = new List<Recorded<Notification<int>>> { OnNext(150, 1) };
  537. inputs.Add(lst);
  538. var start = (ushort)(301 + i);
  539. for (var j = 0; j < i; j++)
  540. {
  541. var onNext = OnNext(start += (ushort)(j * 5), j + i + 2);
  542. lst.Add(onNext);
  543. }
  544. lst.Add(OnCompleted<int>((ushort)(start + N - i)));
  545. }
  546. var inputsFlat = inputs.Aggregate((l, r) => l.Concat(r).ToList()).ToArray();
  547. var resOnNext = (from n in inputsFlat
  548. where n.Time >= 200
  549. where n.Value.Kind == NotificationKind.OnNext
  550. orderby n.Time
  551. select n).ToList();
  552. var lastCompleted = (from n in inputsFlat
  553. where n.Time >= 200
  554. where n.Value.Kind == NotificationKind.OnCompleted
  555. orderby n.Time descending
  556. select n).First();
  557. var scheduler = new TestScheduler();
  558. // Last ToArray: got to create the hot observables *now*
  559. var xss = inputs.Select(lst => (IObservable<int>)scheduler.CreateHotObservable(lst.ToArray())).ToArray();
  560. var res = scheduler.Start(() =>
  561. xss.Merge(scheduler)
  562. );
  563. Assert.True(resOnNext.Count + 1 == res.Messages.Count, "length");
  564. for (var i = 0; i < resOnNext.Count; i++)
  565. {
  566. var msg = res.Messages[i];
  567. Assert.True(msg.Time == resOnNext[i].Time);
  568. Assert.True(msg.Value.Kind == NotificationKind.OnNext);
  569. Assert.True(msg.Value.Value == resOnNext[i].Value.Value);
  570. }
  571. Assert.True(res.Messages[resOnNext.Count].Value.Kind == NotificationKind.OnCompleted && res.Messages[resOnNext.Count].Time == lastCompleted.Time, "complete");
  572. }
  573. [TestMethod]
  574. public void Merge_ErrorLeft()
  575. {
  576. var scheduler = new TestScheduler();
  577. var ex = new Exception();
  578. var o1 = scheduler.CreateHotObservable(
  579. OnNext(150, 1),
  580. OnNext(210, 2),
  581. OnError<int>(245, ex)
  582. );
  583. var o2 = scheduler.CreateHotObservable(
  584. OnNext(150, 1),
  585. OnNext(215, 3),
  586. OnCompleted<int>(250)
  587. );
  588. var res = scheduler.Start(() =>
  589. Observable.Merge(o1, o2, scheduler)
  590. );
  591. res.Messages.AssertEqual(
  592. OnNext(210, 2),
  593. OnNext(215, 3),
  594. OnError<int>(245, ex)
  595. );
  596. o1.Subscriptions.AssertEqual(
  597. Subscribe(201, 245)
  598. );
  599. o2.Subscriptions.AssertEqual(
  600. Subscribe(202, 245)
  601. );
  602. }
  603. [TestMethod]
  604. public void Merge_ErrorCausesDisposal()
  605. {
  606. var scheduler = new TestScheduler();
  607. var ex = new Exception();
  608. var e1 = scheduler.CreateHotObservable(
  609. OnNext(150, 1),
  610. OnError<int>(210, ex) //!
  611. );
  612. var o1 = scheduler.CreateHotObservable(
  613. OnNext(150, 1),
  614. OnNext(220, 1), // should not come
  615. OnCompleted<int>(230)
  616. );
  617. var res = scheduler.Start(() =>
  618. Observable.Merge(e1, o1, scheduler)
  619. );
  620. res.Messages.AssertEqual(
  621. OnError<int>(210, ex) //!
  622. );
  623. e1.Subscriptions.AssertEqual(
  624. Subscribe(201, 210)
  625. );
  626. o1.Subscriptions.AssertEqual(
  627. Subscribe(202, 210)
  628. );
  629. }
  630. [TestMethod]
  631. public void Merge_ObservableOfObservable_Data()
  632. {
  633. var scheduler = new TestScheduler();
  634. var ys1 = scheduler.CreateColdObservable(
  635. OnNext(10, 101),
  636. OnNext(20, 102),
  637. OnNext(110, 103),
  638. OnNext(120, 104),
  639. OnNext(210, 105),
  640. OnNext(220, 106),
  641. OnCompleted<int>(230)
  642. );
  643. var ys2 = scheduler.CreateColdObservable(
  644. OnNext(10, 201),
  645. OnNext(20, 202),
  646. OnNext(30, 203),
  647. OnNext(40, 204),
  648. OnCompleted<int>(50)
  649. );
  650. var ys3 = scheduler.CreateColdObservable(
  651. OnNext(10, 301),
  652. OnNext(20, 302),
  653. OnNext(30, 303),
  654. OnNext(40, 304),
  655. OnNext(120, 305),
  656. OnCompleted<int>(150)
  657. );
  658. var xs = scheduler.CreateHotObservable(
  659. OnNext<IObservable<int>>(300, ys1),
  660. OnNext<IObservable<int>>(400, ys2),
  661. OnNext<IObservable<int>>(500, ys3),
  662. OnCompleted<IObservable<int>>(600)
  663. );
  664. var res = scheduler.Start(() =>
  665. xs.Merge()
  666. );
  667. res.Messages.AssertEqual(
  668. OnNext(310, 101),
  669. OnNext(320, 102),
  670. OnNext(410, 103),
  671. OnNext(410, 201),
  672. OnNext(420, 104),
  673. OnNext(420, 202),
  674. OnNext(430, 203),
  675. OnNext(440, 204),
  676. OnNext(510, 105),
  677. OnNext(510, 301),
  678. OnNext(520, 106),
  679. OnNext(520, 302),
  680. OnNext(530, 303),
  681. OnNext(540, 304),
  682. OnNext(620, 305),
  683. OnCompleted<int>(650)
  684. );
  685. #if !NO_PERF
  686. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  687. xs.Subscriptions.AssertEqual(
  688. Subscribe(200, 600)
  689. );
  690. #else
  691. xs.Subscriptions.AssertEqual(
  692. Subscribe(200, 650)
  693. );
  694. #endif
  695. ys1.Subscriptions.AssertEqual(
  696. Subscribe(300, 530)
  697. );
  698. ys2.Subscriptions.AssertEqual(
  699. Subscribe(400, 450)
  700. );
  701. ys3.Subscriptions.AssertEqual(
  702. Subscribe(500, 650)
  703. );
  704. }
  705. [TestMethod]
  706. public void Merge_ObservableOfObservable_Data_NonOverlapped()
  707. {
  708. var scheduler = new TestScheduler();
  709. var ys1 = scheduler.CreateColdObservable(
  710. OnNext(10, 101),
  711. OnNext(20, 102),
  712. OnCompleted<int>(230)
  713. );
  714. var ys2 = scheduler.CreateColdObservable(
  715. OnNext(10, 201),
  716. OnNext(20, 202),
  717. OnNext(30, 203),
  718. OnNext(40, 204),
  719. OnCompleted<int>(50)
  720. );
  721. var ys3 = scheduler.CreateColdObservable(
  722. OnNext(10, 301),
  723. OnNext(20, 302),
  724. OnNext(30, 303),
  725. OnNext(40, 304),
  726. OnCompleted<int>(50)
  727. );
  728. var xs = scheduler.CreateHotObservable(
  729. OnNext<IObservable<int>>(300, ys1),
  730. OnNext<IObservable<int>>(400, ys2),
  731. OnNext<IObservable<int>>(500, ys3),
  732. OnCompleted<IObservable<int>>(600)
  733. );
  734. var res = scheduler.Start(() =>
  735. xs.Merge()
  736. );
  737. res.Messages.AssertEqual(
  738. OnNext(310, 101),
  739. OnNext(320, 102),
  740. OnNext(410, 201),
  741. OnNext(420, 202),
  742. OnNext(430, 203),
  743. OnNext(440, 204),
  744. OnNext(510, 301),
  745. OnNext(520, 302),
  746. OnNext(530, 303),
  747. OnNext(540, 304),
  748. OnCompleted<int>(600)
  749. );
  750. xs.Subscriptions.AssertEqual(
  751. Subscribe(200, 600)
  752. );
  753. ys1.Subscriptions.AssertEqual(
  754. Subscribe(300, 530)
  755. );
  756. ys2.Subscriptions.AssertEqual(
  757. Subscribe(400, 450)
  758. );
  759. ys3.Subscriptions.AssertEqual(
  760. Subscribe(500, 550)
  761. );
  762. }
  763. [TestMethod]
  764. public void Merge_ObservableOfObservable_InnerThrows()
  765. {
  766. var scheduler = new TestScheduler();
  767. var ex = new Exception();
  768. var ys1 = scheduler.CreateColdObservable(
  769. OnNext(10, 101),
  770. OnNext(20, 102),
  771. OnNext(110, 103),
  772. OnNext(120, 104),
  773. OnNext(210, 105),
  774. OnNext(220, 106),
  775. OnCompleted<int>(230)
  776. );
  777. var ys2 = scheduler.CreateColdObservable(
  778. OnNext(10, 201),
  779. OnNext(20, 202),
  780. OnNext(30, 203),
  781. OnNext(40, 204),
  782. OnError<int>(50, ex)
  783. );
  784. var ys3 = scheduler.CreateColdObservable(
  785. OnNext(10, 301),
  786. OnNext(20, 302),
  787. OnNext(30, 303),
  788. OnNext(40, 304),
  789. OnCompleted<int>(150)
  790. );
  791. var xs = scheduler.CreateHotObservable(
  792. OnNext<IObservable<int>>(300, ys1),
  793. OnNext<IObservable<int>>(400, ys2),
  794. OnNext<IObservable<int>>(500, ys3),
  795. OnCompleted<IObservable<int>>(600)
  796. );
  797. var res = scheduler.Start(() =>
  798. xs.Merge()
  799. );
  800. res.Messages.AssertEqual(
  801. OnNext(310, 101),
  802. OnNext(320, 102),
  803. OnNext(410, 103),
  804. OnNext(410, 201),
  805. OnNext(420, 104),
  806. OnNext(420, 202),
  807. OnNext(430, 203),
  808. OnNext(440, 204),
  809. OnError<int>(450, ex)
  810. );
  811. xs.Subscriptions.AssertEqual(
  812. Subscribe(200, 450)
  813. );
  814. ys1.Subscriptions.AssertEqual(
  815. Subscribe(300, 450)
  816. );
  817. ys2.Subscriptions.AssertEqual(
  818. Subscribe(400, 450)
  819. );
  820. ys3.Subscriptions.AssertEqual(
  821. );
  822. }
  823. [TestMethod]
  824. public void Merge_ObservableOfObservable_OuterThrows()
  825. {
  826. var scheduler = new TestScheduler();
  827. var ex = new Exception();
  828. var ys1 = scheduler.CreateColdObservable(
  829. OnNext(10, 101),
  830. OnNext(20, 102),
  831. OnNext(110, 103),
  832. OnNext(120, 104),
  833. OnNext(210, 105),
  834. OnNext(220, 106),
  835. OnCompleted<int>(230)
  836. );
  837. var ys2 = scheduler.CreateColdObservable(
  838. OnNext(10, 201),
  839. OnNext(20, 202),
  840. OnNext(30, 203),
  841. OnNext(40, 204),
  842. OnCompleted<int>(50)
  843. );
  844. var xs = scheduler.CreateHotObservable(
  845. OnNext<IObservable<int>>(300, ys1),
  846. OnNext<IObservable<int>>(400, ys2),
  847. OnError<IObservable<int>>(500, ex)
  848. );
  849. var res = scheduler.Start(() =>
  850. xs.Merge()
  851. );
  852. res.Messages.AssertEqual(
  853. OnNext(310, 101),
  854. OnNext(320, 102),
  855. OnNext(410, 103),
  856. OnNext(410, 201),
  857. OnNext(420, 104),
  858. OnNext(420, 202),
  859. OnNext(430, 203),
  860. OnNext(440, 204),
  861. OnError<int>(500, ex)
  862. );
  863. xs.Subscriptions.AssertEqual(
  864. Subscribe(200, 500)
  865. );
  866. ys1.Subscriptions.AssertEqual(
  867. Subscribe(300, 500)
  868. );
  869. ys2.Subscriptions.AssertEqual(
  870. Subscribe(400, 450)
  871. );
  872. }
  873. [TestMethod]
  874. public void Merge_Binary_DefaultScheduler()
  875. {
  876. Assert.True(Observable.Return(1).Merge(Observable.Return(2)).ToEnumerable().OrderBy(x => x).SequenceEqual([1, 2]));
  877. }
  878. [TestMethod]
  879. public void Merge_Params_DefaultScheduler()
  880. {
  881. Assert.True(Observable.Merge(Observable.Return(1), Observable.Return(2)).ToEnumerable().OrderBy(x => x).SequenceEqual([1, 2]));
  882. }
  883. [TestMethod]
  884. public void Merge_IEnumerableOfIObservable_DefaultScheduler()
  885. {
  886. Assert.True(Observable.Merge((IEnumerable<IObservable<int>>)[Observable.Return(1), Observable.Return(2)]).ToEnumerable().OrderBy(x => x).SequenceEqual([1, 2]));
  887. }
  888. [TestMethod]
  889. public void MergeConcat_ArgumentChecking()
  890. {
  891. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IEnumerable<IObservable<int>>), 1, DummyScheduler.Instance));
  892. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Merge(DummyEnumerable<IObservable<int>>.Instance, 0, DummyScheduler.Instance));
  893. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(DummyEnumerable<IObservable<int>>.Instance, 1, null));
  894. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IEnumerable<IObservable<int>>), 1));
  895. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Merge(DummyEnumerable<IObservable<int>>.Instance, 0));
  896. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Merge(default(IObservable<IObservable<int>>), 1));
  897. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Merge(DummyObservable<IObservable<int>>.Instance, 0));
  898. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(default(IObservable<IObservable<int>>)));
  899. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Concat(default(IObservable<Task<int>>)));
  900. }
  901. [TestMethod]
  902. public void MergeConcat_Enumerable_Scheduler()
  903. {
  904. var b = Enumerable.Range(1, 3).Select(x => Observable.Range(x * 10, 3)).Merge(1)
  905. .SequenceEqual(new[] { 10, 11, 12, 20, 21, 22, 30, 31, 32 }.ToObservable())
  906. .First();
  907. Assert.True(b);
  908. }
  909. [TestMethod]
  910. public void MergeConcat_Enumerable()
  911. {
  912. var b = Enumerable.Range(1, 3).Select(x => Observable.Range(x * 10, 3)).Merge(1, DefaultScheduler.Instance)
  913. .SequenceEqual(new[] { 10, 11, 12, 20, 21, 22, 30, 31, 32 }.ToObservable())
  914. .First();
  915. Assert.True(b);
  916. }
  917. [TestMethod]
  918. public void MergeConcat_Default()
  919. {
  920. var b = Observable.Range(1, 3).Select(x => Observable.Range(x * 10, 3)).Concat()
  921. .SequenceEqual(new[] { 10, 11, 12, 20, 21, 22, 30, 31, 32 }.ToObservable())
  922. .First();
  923. Assert.True(b);
  924. }
  925. [TestMethod]
  926. public void MergeConcat_Basic()
  927. {
  928. var scheduler = new TestScheduler();
  929. var ys1 = scheduler.CreateColdObservable(
  930. OnNext(50, 1),
  931. OnNext(100, 2),
  932. OnNext(120, 3),
  933. OnCompleted<int>(140)
  934. );
  935. var ys2 = scheduler.CreateColdObservable(
  936. OnNext(20, 4),
  937. OnNext(70, 5),
  938. OnCompleted<int>(200)
  939. );
  940. var ys3 = scheduler.CreateColdObservable(
  941. OnNext(10, 6),
  942. OnNext(90, 7),
  943. OnNext(110, 8),
  944. OnCompleted<int>(130)
  945. );
  946. var ys4 = scheduler.CreateColdObservable(
  947. OnNext(210, 9),
  948. OnNext(240, 10),
  949. OnCompleted<int>(300)
  950. );
  951. var xs = scheduler.CreateHotObservable(
  952. OnNext<IObservable<int>>(210, ys1),
  953. OnNext<IObservable<int>>(260, ys2),
  954. OnNext<IObservable<int>>(270, ys3),
  955. OnNext<IObservable<int>>(320, ys4),
  956. OnCompleted<IObservable<int>>(400)
  957. );
  958. var res = scheduler.Start(() =>
  959. xs.Merge(2)
  960. );
  961. res.Messages.AssertEqual(
  962. OnNext(260, 1),
  963. OnNext(280, 4),
  964. OnNext(310, 2),
  965. OnNext(330, 3),
  966. OnNext(330, 5),
  967. OnNext(360, 6),
  968. OnNext(440, 7),
  969. OnNext(460, 8),
  970. OnNext(670, 9),
  971. OnNext(700, 10),
  972. OnCompleted<int>(760)
  973. );
  974. #if !NO_PERF
  975. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  976. xs.Subscriptions.AssertEqual(
  977. Subscribe(200, 400)
  978. );
  979. #else
  980. xs.Subscriptions.AssertEqual(
  981. Subscribe(200, 760)
  982. );
  983. #endif
  984. ys1.Subscriptions.AssertEqual(
  985. Subscribe(210, 350)
  986. );
  987. ys2.Subscriptions.AssertEqual(
  988. Subscribe(260, 460)
  989. );
  990. ys3.Subscriptions.AssertEqual(
  991. Subscribe(350, 480)
  992. );
  993. ys4.Subscriptions.AssertEqual(
  994. Subscribe(460, 760)
  995. );
  996. }
  997. [TestMethod]
  998. public void MergeConcat_Basic_Long()
  999. {
  1000. var scheduler = new TestScheduler();
  1001. var ys1 = scheduler.CreateColdObservable(
  1002. OnNext(50, 1),
  1003. OnNext(100, 2),
  1004. OnNext(120, 3),
  1005. OnCompleted<int>(140)
  1006. );
  1007. var ys2 = scheduler.CreateColdObservable(
  1008. OnNext(20, 4),
  1009. OnNext(70, 5),
  1010. OnCompleted<int>(300)
  1011. );
  1012. var ys3 = scheduler.CreateColdObservable(
  1013. OnNext(10, 6),
  1014. OnNext(90, 7),
  1015. OnNext(110, 8),
  1016. OnCompleted<int>(130)
  1017. );
  1018. var ys4 = scheduler.CreateColdObservable(
  1019. OnNext(210, 9),
  1020. OnNext(240, 10),
  1021. OnCompleted<int>(300)
  1022. );
  1023. var xs = scheduler.CreateHotObservable(
  1024. OnNext<IObservable<int>>(210, ys1),
  1025. OnNext<IObservable<int>>(260, ys2),
  1026. OnNext<IObservable<int>>(270, ys3),
  1027. OnNext<IObservable<int>>(320, ys4),
  1028. OnCompleted<IObservable<int>>(400)
  1029. );
  1030. var res = scheduler.Start(() =>
  1031. xs.Merge(2)
  1032. );
  1033. res.Messages.AssertEqual(
  1034. OnNext(260, 1),
  1035. OnNext(280, 4),
  1036. OnNext(310, 2),
  1037. OnNext(330, 3),
  1038. OnNext(330, 5),
  1039. OnNext(360, 6),
  1040. OnNext(440, 7),
  1041. OnNext(460, 8),
  1042. OnNext(690, 9),
  1043. OnNext(720, 10),
  1044. OnCompleted<int>(780)
  1045. );
  1046. #if !NO_PERF
  1047. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  1048. xs.Subscriptions.AssertEqual(
  1049. Subscribe(200, 400)
  1050. );
  1051. #else
  1052. xs.Subscriptions.AssertEqual(
  1053. Subscribe(200, 780)
  1054. );
  1055. #endif
  1056. ys1.Subscriptions.AssertEqual(
  1057. Subscribe(210, 350)
  1058. );
  1059. ys2.Subscriptions.AssertEqual(
  1060. Subscribe(260, 560)
  1061. );
  1062. ys3.Subscriptions.AssertEqual(
  1063. Subscribe(350, 480)
  1064. );
  1065. ys4.Subscriptions.AssertEqual(
  1066. Subscribe(480, 780)
  1067. );
  1068. }
  1069. [TestMethod]
  1070. public void MergeConcat_Basic_Wide()
  1071. {
  1072. var scheduler = new TestScheduler();
  1073. var ys1 = scheduler.CreateColdObservable(
  1074. OnNext(50, 1),
  1075. OnNext(100, 2),
  1076. OnNext(120, 3),
  1077. OnCompleted<int>(140)
  1078. );
  1079. var ys2 = scheduler.CreateColdObservable(
  1080. OnNext(20, 4),
  1081. OnNext(70, 5),
  1082. OnCompleted<int>(300)
  1083. );
  1084. var ys3 = scheduler.CreateColdObservable(
  1085. OnNext(10, 6),
  1086. OnNext(90, 7),
  1087. OnNext(110, 8),
  1088. OnCompleted<int>(130)
  1089. );
  1090. var ys4 = scheduler.CreateColdObservable(
  1091. OnNext(210, 9),
  1092. OnNext(240, 10),
  1093. OnCompleted<int>(300)
  1094. );
  1095. var xs = scheduler.CreateHotObservable(
  1096. OnNext<IObservable<int>>(210, ys1),
  1097. OnNext<IObservable<int>>(260, ys2),
  1098. OnNext<IObservable<int>>(270, ys3),
  1099. OnNext<IObservable<int>>(420, ys4),
  1100. OnCompleted<IObservable<int>>(450)
  1101. );
  1102. var res = scheduler.Start(() =>
  1103. xs.Merge(3)
  1104. );
  1105. res.Messages.AssertEqual(
  1106. OnNext(260, 1),
  1107. OnNext(280, 4),
  1108. OnNext(280, 6),
  1109. OnNext(310, 2),
  1110. OnNext(330, 3),
  1111. OnNext(330, 5),
  1112. OnNext(360, 7),
  1113. OnNext(380, 8),
  1114. OnNext(630, 9),
  1115. OnNext(660, 10),
  1116. OnCompleted<int>(720)
  1117. );
  1118. #if !NO_PERF
  1119. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  1120. xs.Subscriptions.AssertEqual(
  1121. Subscribe(200, 450)
  1122. );
  1123. #else
  1124. xs.Subscriptions.AssertEqual(
  1125. Subscribe(200, 720)
  1126. );
  1127. #endif
  1128. ys1.Subscriptions.AssertEqual(
  1129. Subscribe(210, 350)
  1130. );
  1131. ys2.Subscriptions.AssertEqual(
  1132. Subscribe(260, 560)
  1133. );
  1134. ys3.Subscriptions.AssertEqual(
  1135. Subscribe(270, 400)
  1136. );
  1137. ys4.Subscriptions.AssertEqual(
  1138. Subscribe(420, 720)
  1139. );
  1140. }
  1141. [TestMethod]
  1142. public void MergeConcat_Basic_Late()
  1143. {
  1144. var scheduler = new TestScheduler();
  1145. var ys1 = scheduler.CreateColdObservable(
  1146. OnNext(50, 1),
  1147. OnNext(100, 2),
  1148. OnNext(120, 3),
  1149. OnCompleted<int>(140)
  1150. );
  1151. var ys2 = scheduler.CreateColdObservable(
  1152. OnNext(20, 4),
  1153. OnNext(70, 5),
  1154. OnCompleted<int>(300)
  1155. );
  1156. var ys3 = scheduler.CreateColdObservable(
  1157. OnNext(10, 6),
  1158. OnNext(90, 7),
  1159. OnNext(110, 8),
  1160. OnCompleted<int>(130)
  1161. );
  1162. var ys4 = scheduler.CreateColdObservable(
  1163. OnNext(210, 9),
  1164. OnNext(240, 10),
  1165. OnCompleted<int>(300)
  1166. );
  1167. var xs = scheduler.CreateHotObservable(
  1168. OnNext<IObservable<int>>(210, ys1),
  1169. OnNext<IObservable<int>>(260, ys2),
  1170. OnNext<IObservable<int>>(270, ys3),
  1171. OnNext<IObservable<int>>(420, ys4),
  1172. OnCompleted<IObservable<int>>(750)
  1173. );
  1174. var res = scheduler.Start(() =>
  1175. xs.Merge(3)
  1176. );
  1177. res.Messages.AssertEqual(
  1178. OnNext(260, 1),
  1179. OnNext(280, 4),
  1180. OnNext(280, 6),
  1181. OnNext(310, 2),
  1182. OnNext(330, 3),
  1183. OnNext(330, 5),
  1184. OnNext(360, 7),
  1185. OnNext(380, 8),
  1186. OnNext(630, 9),
  1187. OnNext(660, 10),
  1188. OnCompleted<int>(750)
  1189. );
  1190. xs.Subscriptions.AssertEqual(
  1191. Subscribe(200, 750)
  1192. );
  1193. ys1.Subscriptions.AssertEqual(
  1194. Subscribe(210, 350)
  1195. );
  1196. ys2.Subscriptions.AssertEqual(
  1197. Subscribe(260, 560)
  1198. );
  1199. ys3.Subscriptions.AssertEqual(
  1200. Subscribe(270, 400)
  1201. );
  1202. ys4.Subscriptions.AssertEqual(
  1203. Subscribe(420, 720)
  1204. );
  1205. }
  1206. [TestMethod]
  1207. public void MergeConcat_Disposed()
  1208. {
  1209. var scheduler = new TestScheduler();
  1210. var ys1 = scheduler.CreateColdObservable(
  1211. OnNext(50, 1),
  1212. OnNext(100, 2),
  1213. OnNext(120, 3),
  1214. OnCompleted<int>(140)
  1215. );
  1216. var ys2 = scheduler.CreateColdObservable(
  1217. OnNext(20, 4),
  1218. OnNext(70, 5),
  1219. OnCompleted<int>(200)
  1220. );
  1221. var ys3 = scheduler.CreateColdObservable(
  1222. OnNext(10, 6),
  1223. OnNext(90, 7),
  1224. OnNext(110, 8),
  1225. OnCompleted<int>(130)
  1226. );
  1227. var ys4 = scheduler.CreateColdObservable(
  1228. OnNext(210, 9),
  1229. OnNext(240, 10),
  1230. OnCompleted<int>(300)
  1231. );
  1232. var xs = scheduler.CreateHotObservable(
  1233. OnNext<IObservable<int>>(210, ys1),
  1234. OnNext<IObservable<int>>(260, ys2),
  1235. OnNext<IObservable<int>>(270, ys3),
  1236. OnNext<IObservable<int>>(320, ys4),
  1237. OnCompleted<IObservable<int>>(400)
  1238. );
  1239. var res = scheduler.Start(() =>
  1240. xs.Merge(2),
  1241. 450
  1242. );
  1243. res.Messages.AssertEqual(
  1244. OnNext(260, 1),
  1245. OnNext(280, 4),
  1246. OnNext(310, 2),
  1247. OnNext(330, 3),
  1248. OnNext(330, 5),
  1249. OnNext(360, 6),
  1250. OnNext(440, 7)
  1251. );
  1252. #if !NO_PERF
  1253. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  1254. xs.Subscriptions.AssertEqual(
  1255. Subscribe(200, 400)
  1256. );
  1257. #else
  1258. xs.Subscriptions.AssertEqual(
  1259. Subscribe(200, 450)
  1260. );
  1261. #endif
  1262. ys1.Subscriptions.AssertEqual(
  1263. Subscribe(210, 350)
  1264. );
  1265. ys2.Subscriptions.AssertEqual(
  1266. Subscribe(260, 450)
  1267. );
  1268. ys3.Subscriptions.AssertEqual(
  1269. Subscribe(350, 450)
  1270. );
  1271. ys4.Subscriptions.AssertEqual(
  1272. );
  1273. }
  1274. [TestMethod]
  1275. public void MergeConcat_OuterError()
  1276. {
  1277. var scheduler = new TestScheduler();
  1278. var ex = new Exception();
  1279. var ys1 = scheduler.CreateColdObservable(
  1280. OnNext(50, 1),
  1281. OnNext(100, 2),
  1282. OnNext(120, 3),
  1283. OnCompleted<int>(140)
  1284. );
  1285. var ys2 = scheduler.CreateColdObservable(
  1286. OnNext(20, 4),
  1287. OnNext(70, 5),
  1288. OnCompleted<int>(200)
  1289. );
  1290. var ys3 = scheduler.CreateColdObservable(
  1291. OnNext(10, 6),
  1292. OnNext(90, 7),
  1293. OnNext(110, 8),
  1294. OnCompleted<int>(130)
  1295. );
  1296. var ys4 = scheduler.CreateColdObservable(
  1297. OnNext(210, 9),
  1298. OnNext(240, 10),
  1299. OnCompleted<int>(300)
  1300. );
  1301. var xs = scheduler.CreateHotObservable(
  1302. OnNext<IObservable<int>>(210, ys1),
  1303. OnNext<IObservable<int>>(260, ys2),
  1304. OnNext<IObservable<int>>(270, ys3),
  1305. OnNext<IObservable<int>>(320, ys4),
  1306. OnError<IObservable<int>>(400, ex)
  1307. );
  1308. var res = scheduler.Start(() =>
  1309. xs.Merge(2)
  1310. );
  1311. res.Messages.AssertEqual(
  1312. OnNext(260, 1),
  1313. OnNext(280, 4),
  1314. OnNext(310, 2),
  1315. OnNext(330, 3),
  1316. OnNext(330, 5),
  1317. OnNext(360, 6),
  1318. OnError<int>(400, ex)
  1319. );
  1320. xs.Subscriptions.AssertEqual(
  1321. Subscribe(200, 400)
  1322. );
  1323. ys1.Subscriptions.AssertEqual(
  1324. Subscribe(210, 350)
  1325. );
  1326. ys2.Subscriptions.AssertEqual(
  1327. Subscribe(260, 400)
  1328. );
  1329. ys3.Subscriptions.AssertEqual(
  1330. Subscribe(350, 400)
  1331. );
  1332. ys4.Subscriptions.AssertEqual(
  1333. );
  1334. }
  1335. [TestMethod]
  1336. public void MergeConcat_InnerError()
  1337. {
  1338. var scheduler = new TestScheduler();
  1339. var ex = new Exception();
  1340. var ys1 = scheduler.CreateColdObservable(
  1341. OnNext(50, 1),
  1342. OnNext(100, 2),
  1343. OnNext(120, 3),
  1344. OnCompleted<int>(140)
  1345. );
  1346. var ys2 = scheduler.CreateColdObservable(
  1347. OnNext(20, 4),
  1348. OnNext(70, 5),
  1349. OnCompleted<int>(200)
  1350. );
  1351. var ys3 = scheduler.CreateColdObservable(
  1352. OnNext(10, 6),
  1353. OnNext(90, 7),
  1354. OnNext(110, 8),
  1355. OnError<int>(140, ex)
  1356. );
  1357. var ys4 = scheduler.CreateColdObservable(
  1358. OnNext(210, 9),
  1359. OnNext(240, 10),
  1360. OnCompleted<int>(300)
  1361. );
  1362. var xs = scheduler.CreateHotObservable(
  1363. OnNext<IObservable<int>>(210, ys1),
  1364. OnNext<IObservable<int>>(260, ys2),
  1365. OnNext<IObservable<int>>(270, ys3),
  1366. OnNext<IObservable<int>>(320, ys4),
  1367. OnCompleted<IObservable<int>>(400)
  1368. );
  1369. var res = scheduler.Start(() =>
  1370. xs.Merge(2)
  1371. );
  1372. res.Messages.AssertEqual(
  1373. OnNext(260, 1),
  1374. OnNext(280, 4),
  1375. OnNext(310, 2),
  1376. OnNext(330, 3),
  1377. OnNext(330, 5),
  1378. OnNext(360, 6),
  1379. OnNext(440, 7),
  1380. OnNext(460, 8),
  1381. OnError<int>(490, ex)
  1382. );
  1383. #if !NO_PERF
  1384. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  1385. xs.Subscriptions.AssertEqual(
  1386. Subscribe(200, 400)
  1387. );
  1388. #else
  1389. xs.Subscriptions.AssertEqual(
  1390. Subscribe(200, 490)
  1391. );
  1392. #endif
  1393. ys1.Subscriptions.AssertEqual(
  1394. Subscribe(210, 350)
  1395. );
  1396. ys2.Subscriptions.AssertEqual(
  1397. Subscribe(260, 460)
  1398. );
  1399. ys3.Subscriptions.AssertEqual(
  1400. Subscribe(350, 490)
  1401. );
  1402. ys4.Subscriptions.AssertEqual(
  1403. Subscribe(460, 490)
  1404. );
  1405. }
  1406. [TestMethod]
  1407. public void Merge_Task()
  1408. {
  1409. var tss = Observable.Merge(new[] { Task.Factory.StartNew(() => 1), Task.Factory.StartNew(() => 2), Task.Factory.StartNew(() => 3) }.ToObservable());
  1410. var res = tss.ToArray().Single();
  1411. Assert.True(res.OrderBy(x => x).SequenceEqual([1, 2, 3]));
  1412. }
  1413. [TestMethod]
  1414. public void Merge_TaskWithCompletionSource_RanToCompletion_Async()
  1415. {
  1416. var tcss = new TaskCompletionSource<int>[2];
  1417. tcss[0] = new TaskCompletionSource<int>();
  1418. tcss[1] = new TaskCompletionSource<int>();
  1419. var res = Observable.Merge(Observable.Range(0, 2).Select(x => tcss[x].Task));
  1420. var lst = new List<int>();
  1421. var done = new ManualResetEvent(false);
  1422. res.Subscribe(lst.Add, () => done.Set());
  1423. tcss[0].SetResult(42);
  1424. tcss[1].SetResult(43);
  1425. done.WaitOne();
  1426. lst.OrderBy(x => x).AssertEqual([42, 43]);
  1427. }
  1428. [TestMethod]
  1429. public void Merge_TaskWithCompletionSource_RanToCompletion_Sync()
  1430. {
  1431. var tcss = new TaskCompletionSource<int>[2];
  1432. tcss[0] = new TaskCompletionSource<int>();
  1433. tcss[1] = new TaskCompletionSource<int>();
  1434. tcss[0].SetResult(42);
  1435. tcss[1].SetResult(43);
  1436. var res = Observable.Merge(Observable.Range(0, 2).Select(x => tcss[x].Task));
  1437. var lst = new List<int>();
  1438. var done = new ManualResetEvent(false);
  1439. res.Subscribe(lst.Add, () => done.Set());
  1440. done.WaitOne();
  1441. lst.OrderBy(x => x).AssertEqual([42, 43]);
  1442. }
  1443. [TestMethod]
  1444. public void Merge_TaskWithCompletionSource_Faulted_Async()
  1445. {
  1446. var tcss = new TaskCompletionSource<int>[3];
  1447. tcss[0] = new TaskCompletionSource<int>();
  1448. tcss[1] = new TaskCompletionSource<int>();
  1449. tcss[2] = new TaskCompletionSource<int>();
  1450. var res = Observable.Merge(Observable.Range(0, 3).Select(x => tcss[x].Task));
  1451. var lst = new List<int>();
  1452. var err = default(Exception);
  1453. var done = new ManualResetEvent(false);
  1454. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1455. var ex = new Exception();
  1456. tcss[1].SetException(ex);
  1457. done.WaitOne();
  1458. lst.AssertEqual([]);
  1459. Assert.Same(ex, err);
  1460. }
  1461. [TestMethod]
  1462. public void Merge_TaskWithCompletionSource_Faulted_Sync()
  1463. {
  1464. var tcss = new TaskCompletionSource<int>[3];
  1465. tcss[0] = new TaskCompletionSource<int>();
  1466. tcss[1] = new TaskCompletionSource<int>();
  1467. tcss[2] = new TaskCompletionSource<int>();
  1468. var ex = new Exception();
  1469. tcss[1].SetException(ex);
  1470. var res = Observable.Merge(Observable.Range(0, 3).Select(x => tcss[x].Task));
  1471. var lst = new List<int>();
  1472. var err = default(Exception);
  1473. var done = new ManualResetEvent(false);
  1474. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1475. done.WaitOne();
  1476. lst.AssertEqual([]);
  1477. Assert.Same(ex, err);
  1478. }
  1479. [TestMethod]
  1480. public void Merge_TaskWithCompletionSource_Canceled_Async()
  1481. {
  1482. var tcss = new TaskCompletionSource<int>[3];
  1483. tcss[0] = new TaskCompletionSource<int>();
  1484. tcss[1] = new TaskCompletionSource<int>();
  1485. tcss[2] = new TaskCompletionSource<int>();
  1486. var res = Observable.Merge(Observable.Range(0, 3).Select(x => tcss[x].Task));
  1487. var lst = new List<int>();
  1488. var err = default(Exception);
  1489. var done = new ManualResetEvent(false);
  1490. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1491. tcss[1].SetCanceled();
  1492. done.WaitOne();
  1493. lst.AssertEqual([]);
  1494. Assert.True(err is TaskCanceledException tcException && tcException.Task == tcss[1].Task);
  1495. }
  1496. [TestMethod]
  1497. public void Merge_TaskWithCompletionSource_Canceled_Sync()
  1498. {
  1499. var tcss = new TaskCompletionSource<int>[3];
  1500. tcss[0] = new TaskCompletionSource<int>();
  1501. tcss[1] = new TaskCompletionSource<int>();
  1502. tcss[2] = new TaskCompletionSource<int>();
  1503. tcss[1].SetCanceled();
  1504. var res = Observable.Merge(Observable.Range(0, 3).Select(x => tcss[x].Task));
  1505. var lst = new List<int>();
  1506. var err = default(Exception);
  1507. var done = new ManualResetEvent(false);
  1508. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1509. done.WaitOne();
  1510. lst.AssertEqual([]);
  1511. Assert.True(err is TaskCanceledException tcException && tcException.Task == tcss[1].Task);
  1512. }
  1513. [TestMethod]
  1514. public void Merge_TaskWithCompletionSource_InnerCompleteBeforeOuter()
  1515. {
  1516. var xs = new Subject<int>();
  1517. var tcss = new TaskCompletionSource<int>[3];
  1518. tcss[0] = new TaskCompletionSource<int>();
  1519. tcss[1] = new TaskCompletionSource<int>();
  1520. tcss[2] = new TaskCompletionSource<int>();
  1521. var res = Observable.Merge(xs.Select(x => tcss[x].Task));
  1522. var lst = new List<int>();
  1523. var done = new ManualResetEvent(false);
  1524. res.Subscribe(lst.Add, () => done.Set());
  1525. tcss[1].SetResult(42);
  1526. xs.OnNext(0);
  1527. xs.OnNext(1);
  1528. xs.OnNext(2);
  1529. tcss[0].SetResult(43);
  1530. tcss[2].SetResult(44);
  1531. xs.OnCompleted();
  1532. done.WaitOne();
  1533. lst.OrderBy(x => x).AssertEqual([42, 43, 44]);
  1534. }
  1535. [TestMethod]
  1536. public void Merge_TaskWithCompletionSource_OuterCompleteBeforeInner()
  1537. {
  1538. var xs = new Subject<int>();
  1539. var tcss = new TaskCompletionSource<int>[3];
  1540. tcss[0] = new TaskCompletionSource<int>();
  1541. tcss[1] = new TaskCompletionSource<int>();
  1542. tcss[2] = new TaskCompletionSource<int>();
  1543. var res = Observable.Merge(xs.Select(x => tcss[x].Task));
  1544. var lst = new List<int>();
  1545. var done = new ManualResetEvent(false);
  1546. res.Subscribe(lst.Add, () => done.Set());
  1547. tcss[1].SetResult(42);
  1548. xs.OnNext(0);
  1549. xs.OnNext(1);
  1550. xs.OnNext(2);
  1551. xs.OnCompleted();
  1552. tcss[0].SetResult(43);
  1553. tcss[2].SetResult(44);
  1554. done.WaitOne();
  1555. lst.OrderBy(x => x).AssertEqual([42, 43, 44]);
  1556. }
  1557. [TestMethod]
  1558. public void Merge_Task_OnError()
  1559. {
  1560. var xs = new Subject<int>();
  1561. var tcss = new TaskCompletionSource<int>[3];
  1562. tcss[0] = new TaskCompletionSource<int>();
  1563. tcss[1] = new TaskCompletionSource<int>();
  1564. tcss[2] = new TaskCompletionSource<int>();
  1565. var res = Observable.Merge(xs.Select(x => tcss[x].Task));
  1566. var lst = new List<int>();
  1567. var err = default(Exception);
  1568. var done = new ManualResetEvent(false);
  1569. res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
  1570. tcss[1].SetResult(42);
  1571. xs.OnNext(0);
  1572. xs.OnNext(1);
  1573. xs.OnNext(2);
  1574. tcss[0].SetResult(43);
  1575. tcss[2].SetResult(44);
  1576. var ex = new Exception();
  1577. xs.OnError(ex);
  1578. done.WaitOne();
  1579. Assert.Same(ex, err);
  1580. }
  1581. }
  1582. }