TakeTest.cs 23 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class TakeTest : ReactiveTest
  22. {
  23. #region + Count +
  24. [Fact]
  25. public void Take_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).Take(0));
  28. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Take(-1));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Take(1).Subscribe(null));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).Take(0, Scheduler.Immediate));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Take(0, default(IScheduler)));
  32. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Take(-1, Scheduler.Immediate));
  33. }
  34. [Fact]
  35. public void Take_Complete_After()
  36. {
  37. var scheduler = new TestScheduler();
  38. var xs = scheduler.CreateHotObservable(
  39. OnNext(70, 6),
  40. OnNext(150, 4),
  41. OnNext(210, 9),
  42. OnNext(230, 13),
  43. OnNext(270, 7),
  44. OnNext(280, 1),
  45. OnNext(300, -1),
  46. OnNext(310, 3),
  47. OnNext(340, 8),
  48. OnNext(370, 11),
  49. OnNext(410, 15),
  50. OnNext(415, 16),
  51. OnNext(460, 72),
  52. OnNext(510, 76),
  53. OnNext(560, 32),
  54. OnNext(570, -100),
  55. OnNext(580, -3),
  56. OnNext(590, 5),
  57. OnNext(630, 10),
  58. OnCompleted<int>(690)
  59. );
  60. var res = scheduler.Start(() =>
  61. xs.Take(20)
  62. );
  63. res.Messages.AssertEqual(
  64. OnNext(210, 9),
  65. OnNext(230, 13),
  66. OnNext(270, 7),
  67. OnNext(280, 1),
  68. OnNext(300, -1),
  69. OnNext(310, 3),
  70. OnNext(340, 8),
  71. OnNext(370, 11),
  72. OnNext(410, 15),
  73. OnNext(415, 16),
  74. OnNext(460, 72),
  75. OnNext(510, 76),
  76. OnNext(560, 32),
  77. OnNext(570, -100),
  78. OnNext(580, -3),
  79. OnNext(590, 5),
  80. OnNext(630, 10),
  81. OnCompleted<int>(690)
  82. );
  83. xs.Subscriptions.AssertEqual(
  84. Subscribe(200, 690)
  85. );
  86. }
  87. [Fact]
  88. public void Take_Complete_Same()
  89. {
  90. var scheduler = new TestScheduler();
  91. var xs = scheduler.CreateHotObservable(
  92. OnNext(70, 6),
  93. OnNext(150, 4),
  94. OnNext(210, 9),
  95. OnNext(230, 13),
  96. OnNext(270, 7),
  97. OnNext(280, 1),
  98. OnNext(300, -1),
  99. OnNext(310, 3),
  100. OnNext(340, 8),
  101. OnNext(370, 11),
  102. OnNext(410, 15),
  103. OnNext(415, 16),
  104. OnNext(460, 72),
  105. OnNext(510, 76),
  106. OnNext(560, 32),
  107. OnNext(570, -100),
  108. OnNext(580, -3),
  109. OnNext(590, 5),
  110. OnNext(630, 10),
  111. OnCompleted<int>(690)
  112. );
  113. var res = scheduler.Start(() =>
  114. xs.Take(17)
  115. );
  116. res.Messages.AssertEqual(
  117. OnNext(210, 9),
  118. OnNext(230, 13),
  119. OnNext(270, 7),
  120. OnNext(280, 1),
  121. OnNext(300, -1),
  122. OnNext(310, 3),
  123. OnNext(340, 8),
  124. OnNext(370, 11),
  125. OnNext(410, 15),
  126. OnNext(415, 16),
  127. OnNext(460, 72),
  128. OnNext(510, 76),
  129. OnNext(560, 32),
  130. OnNext(570, -100),
  131. OnNext(580, -3),
  132. OnNext(590, 5),
  133. OnNext(630, 10),
  134. OnCompleted<int>(630)
  135. );
  136. xs.Subscriptions.AssertEqual(
  137. Subscribe(200, 630)
  138. );
  139. }
  140. [Fact]
  141. public void Take_Complete_Before()
  142. {
  143. var scheduler = new TestScheduler();
  144. var xs = scheduler.CreateHotObservable(
  145. OnNext(70, 6),
  146. OnNext(150, 4),
  147. OnNext(210, 9),
  148. OnNext(230, 13),
  149. OnNext(270, 7),
  150. OnNext(280, 1),
  151. OnNext(300, -1),
  152. OnNext(310, 3),
  153. OnNext(340, 8),
  154. OnNext(370, 11),
  155. OnNext(410, 15),
  156. OnNext(415, 16),
  157. OnNext(460, 72),
  158. OnNext(510, 76),
  159. OnNext(560, 32),
  160. OnNext(570, -100),
  161. OnNext(580, -3),
  162. OnNext(590, 5),
  163. OnNext(630, 10),
  164. OnCompleted<int>(690)
  165. );
  166. var res = scheduler.Start(() =>
  167. xs.Take(10)
  168. );
  169. res.Messages.AssertEqual(
  170. OnNext(210, 9),
  171. OnNext(230, 13),
  172. OnNext(270, 7),
  173. OnNext(280, 1),
  174. OnNext(300, -1),
  175. OnNext(310, 3),
  176. OnNext(340, 8),
  177. OnNext(370, 11),
  178. OnNext(410, 15),
  179. OnNext(415, 16),
  180. OnCompleted<int>(415)
  181. );
  182. xs.Subscriptions.AssertEqual(
  183. Subscribe(200, 415)
  184. );
  185. }
  186. [Fact]
  187. public void Take_Error_After()
  188. {
  189. var scheduler = new TestScheduler();
  190. var ex = new Exception();
  191. var xs = scheduler.CreateHotObservable(
  192. OnNext(70, 6),
  193. OnNext(150, 4),
  194. OnNext(210, 9),
  195. OnNext(230, 13),
  196. OnNext(270, 7),
  197. OnNext(280, 1),
  198. OnNext(300, -1),
  199. OnNext(310, 3),
  200. OnNext(340, 8),
  201. OnNext(370, 11),
  202. OnNext(410, 15),
  203. OnNext(415, 16),
  204. OnNext(460, 72),
  205. OnNext(510, 76),
  206. OnNext(560, 32),
  207. OnNext(570, -100),
  208. OnNext(580, -3),
  209. OnNext(590, 5),
  210. OnNext(630, 10),
  211. OnError<int>(690, ex)
  212. );
  213. var res = scheduler.Start(() =>
  214. xs.Take(20)
  215. );
  216. res.Messages.AssertEqual(
  217. OnNext(210, 9),
  218. OnNext(230, 13),
  219. OnNext(270, 7),
  220. OnNext(280, 1),
  221. OnNext(300, -1),
  222. OnNext(310, 3),
  223. OnNext(340, 8),
  224. OnNext(370, 11),
  225. OnNext(410, 15),
  226. OnNext(415, 16),
  227. OnNext(460, 72),
  228. OnNext(510, 76),
  229. OnNext(560, 32),
  230. OnNext(570, -100),
  231. OnNext(580, -3),
  232. OnNext(590, 5),
  233. OnNext(630, 10),
  234. OnError<int>(690, ex)
  235. );
  236. xs.Subscriptions.AssertEqual(
  237. Subscribe(200, 690)
  238. );
  239. }
  240. [Fact]
  241. public void Take_Error_Same()
  242. {
  243. var scheduler = new TestScheduler();
  244. var xs = scheduler.CreateHotObservable(
  245. OnNext(70, 6),
  246. OnNext(150, 4),
  247. OnNext(210, 9),
  248. OnNext(230, 13),
  249. OnNext(270, 7),
  250. OnNext(280, 1),
  251. OnNext(300, -1),
  252. OnNext(310, 3),
  253. OnNext(340, 8),
  254. OnNext(370, 11),
  255. OnNext(410, 15),
  256. OnNext(415, 16),
  257. OnNext(460, 72),
  258. OnNext(510, 76),
  259. OnNext(560, 32),
  260. OnNext(570, -100),
  261. OnNext(580, -3),
  262. OnNext(590, 5),
  263. OnNext(630, 10),
  264. OnError<int>(690, new Exception())
  265. );
  266. var res = scheduler.Start(() =>
  267. xs.Take(17)
  268. );
  269. res.Messages.AssertEqual(
  270. OnNext(210, 9),
  271. OnNext(230, 13),
  272. OnNext(270, 7),
  273. OnNext(280, 1),
  274. OnNext(300, -1),
  275. OnNext(310, 3),
  276. OnNext(340, 8),
  277. OnNext(370, 11),
  278. OnNext(410, 15),
  279. OnNext(415, 16),
  280. OnNext(460, 72),
  281. OnNext(510, 76),
  282. OnNext(560, 32),
  283. OnNext(570, -100),
  284. OnNext(580, -3),
  285. OnNext(590, 5),
  286. OnNext(630, 10),
  287. OnCompleted<int>(630)
  288. );
  289. xs.Subscriptions.AssertEqual(
  290. Subscribe(200, 630)
  291. );
  292. }
  293. [Fact]
  294. public void Take_Error_Before()
  295. {
  296. var scheduler = new TestScheduler();
  297. var xs = scheduler.CreateHotObservable(
  298. OnNext(70, 6),
  299. OnNext(150, 4),
  300. OnNext(210, 9),
  301. OnNext(230, 13),
  302. OnNext(270, 7),
  303. OnNext(280, 1),
  304. OnNext(300, -1),
  305. OnNext(310, 3),
  306. OnNext(340, 8),
  307. OnNext(370, 11),
  308. OnNext(410, 15),
  309. OnNext(415, 16),
  310. OnNext(460, 72),
  311. OnNext(510, 76),
  312. OnNext(560, 32),
  313. OnNext(570, -100),
  314. OnNext(580, -3),
  315. OnNext(590, 5),
  316. OnNext(630, 10),
  317. OnError<int>(690, new Exception())
  318. );
  319. var res = scheduler.Start(() =>
  320. xs.Take(3)
  321. );
  322. res.Messages.AssertEqual(
  323. OnNext(210, 9),
  324. OnNext(230, 13),
  325. OnNext(270, 7),
  326. OnCompleted<int>(270)
  327. );
  328. xs.Subscriptions.AssertEqual(
  329. Subscribe(200, 270)
  330. );
  331. }
  332. [Fact]
  333. public void Take_Dispose_Before()
  334. {
  335. var scheduler = new TestScheduler();
  336. var xs = scheduler.CreateHotObservable(
  337. OnNext(70, 6),
  338. OnNext(150, 4),
  339. OnNext(210, 9),
  340. OnNext(230, 13),
  341. OnNext(270, 7),
  342. OnNext(280, 1),
  343. OnNext(300, -1),
  344. OnNext(310, 3),
  345. OnNext(340, 8),
  346. OnNext(370, 11),
  347. OnNext(410, 15),
  348. OnNext(415, 16),
  349. OnNext(460, 72),
  350. OnNext(510, 76),
  351. OnNext(560, 32),
  352. OnNext(570, -100),
  353. OnNext(580, -3),
  354. OnNext(590, 5),
  355. OnNext(630, 10)
  356. );
  357. var res = scheduler.Start(() =>
  358. xs.Take(3),
  359. 250
  360. );
  361. res.Messages.AssertEqual(
  362. OnNext(210, 9),
  363. OnNext(230, 13)
  364. );
  365. xs.Subscriptions.AssertEqual(
  366. Subscribe(200, 250)
  367. );
  368. }
  369. [Fact]
  370. public void Take_Dispose_After()
  371. {
  372. var scheduler = new TestScheduler();
  373. var xs = scheduler.CreateHotObservable(
  374. OnNext(70, 6),
  375. OnNext(150, 4),
  376. OnNext(210, 9),
  377. OnNext(230, 13),
  378. OnNext(270, 7),
  379. OnNext(280, 1),
  380. OnNext(300, -1),
  381. OnNext(310, 3),
  382. OnNext(340, 8),
  383. OnNext(370, 11),
  384. OnNext(410, 15),
  385. OnNext(415, 16),
  386. OnNext(460, 72),
  387. OnNext(510, 76),
  388. OnNext(560, 32),
  389. OnNext(570, -100),
  390. OnNext(580, -3),
  391. OnNext(590, 5),
  392. OnNext(630, 10)
  393. );
  394. var res = scheduler.Start(() =>
  395. xs.Take(3),
  396. 400
  397. );
  398. res.Messages.AssertEqual(
  399. OnNext(210, 9),
  400. OnNext(230, 13),
  401. OnNext(270, 7),
  402. OnCompleted<int>(270)
  403. );
  404. xs.Subscriptions.AssertEqual(
  405. Subscribe(200, 270)
  406. );
  407. }
  408. [Fact]
  409. public void Take_0_Scheduler()
  410. {
  411. var scheduler = new TestScheduler();
  412. var xs = scheduler.CreateHotObservable(
  413. OnNext(70, 6),
  414. OnNext(150, 4),
  415. OnNext(210, 9),
  416. OnNext(230, 13)
  417. );
  418. var res = scheduler.Start(() =>
  419. xs.Take(0, scheduler)
  420. );
  421. res.Messages.AssertEqual(
  422. OnCompleted<int>(200 + 1) // Extra scheduling call by Empty
  423. );
  424. xs.Subscriptions.AssertEqual(
  425. );
  426. }
  427. [Fact]
  428. public void Take_0_DefaultScheduler()
  429. {
  430. var scheduler = new TestScheduler();
  431. var xs = scheduler.CreateHotObservable(
  432. OnNext(70, 6),
  433. OnNext(150, 4),
  434. OnNext(210, 9),
  435. OnNext(230, 13)
  436. );
  437. var res = scheduler.Start(() =>
  438. xs.Take(0)
  439. );
  440. res.Messages.AssertEqual(
  441. OnCompleted<int>(200) // Immediate
  442. );
  443. xs.Subscriptions.AssertEqual(
  444. );
  445. }
  446. [Fact]
  447. public void Take_Non0_Scheduler()
  448. {
  449. var scheduler = new TestScheduler();
  450. var xs = scheduler.CreateHotObservable(
  451. OnNext(70, 6),
  452. OnNext(150, 4),
  453. OnNext(210, 9),
  454. OnNext(230, 13)
  455. );
  456. var res = scheduler.Start(() =>
  457. xs.Take(1, scheduler)
  458. );
  459. res.Messages.AssertEqual(
  460. OnNext(210, 9),
  461. OnCompleted<int>(210)
  462. );
  463. xs.Subscriptions.AssertEqual(
  464. Subscribe(200, 210)
  465. );
  466. }
  467. [Fact]
  468. public void Take_Take1()
  469. {
  470. var scheduler = new TestScheduler();
  471. var xs = scheduler.CreateHotObservable(
  472. OnNext(70, 6),
  473. OnNext(150, 4),
  474. OnNext(210, 9),
  475. OnNext(230, 13),
  476. OnNext(270, 7),
  477. OnNext(280, 1),
  478. OnNext(300, -1),
  479. OnNext(310, 3),
  480. OnNext(340, 8),
  481. OnNext(370, 11),
  482. OnCompleted<int>(400)
  483. );
  484. var res = scheduler.Start(() =>
  485. xs.Take(3).Take(4)
  486. );
  487. res.Messages.AssertEqual(
  488. OnNext(210, 9),
  489. OnNext(230, 13),
  490. OnNext(270, 7),
  491. OnCompleted<int>(270)
  492. );
  493. xs.Subscriptions.AssertEqual(
  494. Subscribe(200, 270)
  495. );
  496. }
  497. [Fact]
  498. public void Take_Take2()
  499. {
  500. var scheduler = new TestScheduler();
  501. var xs = scheduler.CreateHotObservable(
  502. OnNext(70, 6),
  503. OnNext(150, 4),
  504. OnNext(210, 9),
  505. OnNext(230, 13),
  506. OnNext(270, 7),
  507. OnNext(280, 1),
  508. OnNext(300, -1),
  509. OnNext(310, 3),
  510. OnNext(340, 8),
  511. OnNext(370, 11),
  512. OnCompleted<int>(400)
  513. );
  514. var res = scheduler.Start(() =>
  515. xs.Take(4).Take(3)
  516. );
  517. res.Messages.AssertEqual(
  518. OnNext(210, 9),
  519. OnNext(230, 13),
  520. OnNext(270, 7),
  521. OnCompleted<int>(270)
  522. );
  523. xs.Subscriptions.AssertEqual(
  524. Subscribe(200, 270)
  525. );
  526. }
  527. [Fact]
  528. public void Take_DecrementsCountFirst()
  529. {
  530. var k = new BehaviorSubject<bool>(true);
  531. k.Take(1).Subscribe(b => k.OnNext(!b));
  532. //
  533. // No assert needed; test will stack overflow for failure.
  534. //
  535. }
  536. #endregion
  537. #region + Timed +
  538. [Fact]
  539. public void Take_Timed_ArgumentChecking()
  540. {
  541. var xs = Observable.Return(42);
  542. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Take(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  543. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Take(xs, TimeSpan.FromSeconds(-1)));
  544. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Take(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default));
  545. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Take(xs, TimeSpan.FromSeconds(1), default(IScheduler)));
  546. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Take(xs, TimeSpan.FromSeconds(-1), Scheduler.Default));
  547. }
  548. [Fact]
  549. public void Take_Zero()
  550. {
  551. var scheduler = new TestScheduler();
  552. var xs = scheduler.CreateHotObservable<int>(
  553. OnNext(210, 1),
  554. OnNext(220, 2),
  555. OnCompleted<int>(230)
  556. );
  557. var res = scheduler.Start(() =>
  558. xs.Take(TimeSpan.Zero, scheduler)
  559. );
  560. res.Messages.AssertEqual(
  561. OnCompleted<int>(201)
  562. );
  563. xs.Subscriptions.AssertEqual(
  564. Subscribe(200, 201)
  565. );
  566. }
  567. [Fact]
  568. public void Take_Some()
  569. {
  570. var scheduler = new TestScheduler();
  571. var xs = scheduler.CreateHotObservable<int>(
  572. OnNext(210, 1),
  573. OnNext(220, 2),
  574. OnNext(230, 3),
  575. OnCompleted<int>(240)
  576. );
  577. var res = scheduler.Start(() =>
  578. xs.Take(TimeSpan.FromTicks(25), scheduler)
  579. );
  580. res.Messages.AssertEqual(
  581. OnNext(210, 1),
  582. OnNext(220, 2),
  583. OnCompleted<int>(225)
  584. );
  585. xs.Subscriptions.AssertEqual(
  586. Subscribe(200, 225)
  587. );
  588. }
  589. [Fact]
  590. public void Take_Late()
  591. {
  592. var scheduler = new TestScheduler();
  593. var xs = scheduler.CreateHotObservable<int>(
  594. OnNext(210, 1),
  595. OnNext(220, 2),
  596. OnCompleted<int>(230)
  597. );
  598. var res = scheduler.Start(() =>
  599. xs.Take(TimeSpan.FromTicks(50), scheduler)
  600. );
  601. res.Messages.AssertEqual(
  602. OnNext(210, 1),
  603. OnNext(220, 2),
  604. OnCompleted<int>(230)
  605. );
  606. xs.Subscriptions.AssertEqual(
  607. Subscribe(200, 230)
  608. );
  609. }
  610. [Fact]
  611. public void Take_Error()
  612. {
  613. var scheduler = new TestScheduler();
  614. var ex = new Exception();
  615. var xs = scheduler.CreateHotObservable<int>(
  616. OnError<int>(210, ex)
  617. );
  618. var res = scheduler.Start(() =>
  619. xs.Take(TimeSpan.FromTicks(50), scheduler)
  620. );
  621. res.Messages.AssertEqual(
  622. OnError<int>(210, ex)
  623. );
  624. xs.Subscriptions.AssertEqual(
  625. Subscribe(200, 210)
  626. );
  627. }
  628. [Fact]
  629. public void Take_Never()
  630. {
  631. var scheduler = new TestScheduler();
  632. var ex = new Exception();
  633. var xs = scheduler.CreateHotObservable<int>(
  634. );
  635. var res = scheduler.Start(() =>
  636. xs.Take(TimeSpan.FromTicks(50), scheduler)
  637. );
  638. res.Messages.AssertEqual(
  639. OnCompleted<int>(250)
  640. );
  641. xs.Subscriptions.AssertEqual(
  642. Subscribe(200, 250)
  643. );
  644. }
  645. [Fact]
  646. public void Take_Twice1()
  647. {
  648. var scheduler = new TestScheduler();
  649. var ex = new Exception();
  650. var xs = scheduler.CreateHotObservable<int>(
  651. OnNext(210, 1),
  652. OnNext(220, 2),
  653. OnNext(230, 3),
  654. OnNext(240, 4),
  655. OnNext(250, 5),
  656. OnNext(260, 6),
  657. OnCompleted<int>(270)
  658. );
  659. var res = scheduler.Start(() =>
  660. xs.Take(TimeSpan.FromTicks(55), scheduler).Take(TimeSpan.FromTicks(35), scheduler)
  661. );
  662. res.Messages.AssertEqual(
  663. OnNext(210, 1),
  664. OnNext(220, 2),
  665. OnNext(230, 3),
  666. OnCompleted<int>(235)
  667. );
  668. xs.Subscriptions.AssertEqual(
  669. Subscribe(200, 235)
  670. );
  671. }
  672. [Fact]
  673. public void Take_Twice2()
  674. {
  675. var scheduler = new TestScheduler();
  676. var ex = new Exception();
  677. var xs = scheduler.CreateHotObservable<int>(
  678. OnNext(210, 1),
  679. OnNext(220, 2),
  680. OnNext(230, 3),
  681. OnNext(240, 4),
  682. OnNext(250, 5),
  683. OnNext(260, 6),
  684. OnCompleted<int>(270)
  685. );
  686. var res = scheduler.Start(() =>
  687. xs.Take(TimeSpan.FromTicks(35), scheduler).Take(TimeSpan.FromTicks(55), scheduler)
  688. );
  689. res.Messages.AssertEqual(
  690. OnNext(210, 1),
  691. OnNext(220, 2),
  692. OnNext(230, 3),
  693. OnCompleted<int>(235)
  694. );
  695. xs.Subscriptions.AssertEqual(
  696. Subscribe(200, 235)
  697. );
  698. }
  699. [Fact]
  700. public void Take_Default()
  701. {
  702. var xs = Observable.Range(0, 10, Scheduler.Default);
  703. var res = xs.Take(TimeSpan.FromSeconds(60));
  704. var e = new ManualResetEvent(false);
  705. var lst = new List<int>();
  706. res.Subscribe(
  707. lst.Add,
  708. () => e.Set()
  709. );
  710. e.WaitOne();
  711. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  712. }
  713. #endregion
  714. }
  715. }