WhereTest.cs 25 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class WhereTest : ReactiveTest
  22. {
  23. [Fact]
  24. public void Where_ArgumentChecking()
  25. {
  26. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).Where<int>(DummyFunc<int, bool>.Instance));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Where<int>((Func<int, bool>)null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Where<int>(DummyFunc<int, bool>.Instance).Subscribe(null));
  29. }
  30. static bool IsPrime(int i)
  31. {
  32. if (i <= 1)
  33. return false;
  34. var max = (int)Math.Sqrt(i);
  35. for (var j = 2; j <= max; ++j)
  36. if (i % j == 0)
  37. return false;
  38. return true;
  39. }
  40. [Fact]
  41. public void Where_Complete()
  42. {
  43. var scheduler = new TestScheduler();
  44. var invoked = 0;
  45. var xs = scheduler.CreateHotObservable(
  46. OnNext(110, 1),
  47. OnNext(180, 2),
  48. OnNext(230, 3),
  49. OnNext(270, 4),
  50. OnNext(340, 5),
  51. OnNext(380, 6),
  52. OnNext(390, 7),
  53. OnNext(450, 8),
  54. OnNext(470, 9),
  55. OnNext(560, 10),
  56. OnNext(580, 11),
  57. OnCompleted<int>(600),
  58. OnNext(610, 12),
  59. OnError<int>(620, new Exception()),
  60. OnCompleted<int>(630)
  61. );
  62. var res = scheduler.Start(() =>
  63. xs.Where(x =>
  64. {
  65. invoked++;
  66. return IsPrime(x);
  67. })
  68. );
  69. res.Messages.AssertEqual(
  70. OnNext(230, 3),
  71. OnNext(340, 5),
  72. OnNext(390, 7),
  73. OnNext(580, 11),
  74. OnCompleted<int>(600)
  75. );
  76. xs.Subscriptions.AssertEqual(
  77. Subscribe(200, 600)
  78. );
  79. Assert.Equal(9, invoked);
  80. }
  81. [Fact]
  82. public void Where_True()
  83. {
  84. var scheduler = new TestScheduler();
  85. var invoked = 0;
  86. var xs = scheduler.CreateHotObservable(
  87. OnNext(110, 1),
  88. OnNext(180, 2),
  89. OnNext(230, 3),
  90. OnNext(270, 4),
  91. OnNext(340, 5),
  92. OnNext(380, 6),
  93. OnNext(390, 7),
  94. OnNext(450, 8),
  95. OnNext(470, 9),
  96. OnNext(560, 10),
  97. OnNext(580, 11),
  98. OnCompleted<int>(600)
  99. );
  100. var res = scheduler.Start(() =>
  101. xs.Where(x =>
  102. {
  103. invoked++;
  104. return true;
  105. })
  106. );
  107. res.Messages.AssertEqual(
  108. OnNext(230, 3),
  109. OnNext(270, 4),
  110. OnNext(340, 5),
  111. OnNext(380, 6),
  112. OnNext(390, 7),
  113. OnNext(450, 8),
  114. OnNext(470, 9),
  115. OnNext(560, 10),
  116. OnNext(580, 11),
  117. OnCompleted<int>(600)
  118. );
  119. xs.Subscriptions.AssertEqual(
  120. Subscribe(200, 600)
  121. );
  122. Assert.Equal(9, invoked);
  123. }
  124. [Fact]
  125. public void Where_False()
  126. {
  127. var scheduler = new TestScheduler();
  128. var invoked = 0;
  129. var xs = scheduler.CreateHotObservable(
  130. OnNext(110, 1),
  131. OnNext(180, 2),
  132. OnNext(230, 3),
  133. OnNext(270, 4),
  134. OnNext(340, 5),
  135. OnNext(380, 6),
  136. OnNext(390, 7),
  137. OnNext(450, 8),
  138. OnNext(470, 9),
  139. OnNext(560, 10),
  140. OnNext(580, 11),
  141. OnCompleted<int>(600)
  142. );
  143. var res = scheduler.Start(() =>
  144. xs.Where(x =>
  145. {
  146. invoked++;
  147. return false;
  148. })
  149. );
  150. res.Messages.AssertEqual(
  151. OnCompleted<int>(600)
  152. );
  153. xs.Subscriptions.AssertEqual(
  154. Subscribe(200, 600)
  155. );
  156. Assert.Equal(9, invoked);
  157. }
  158. [Fact]
  159. public void Where_Dispose()
  160. {
  161. var scheduler = new TestScheduler();
  162. var invoked = 0;
  163. var xs = scheduler.CreateHotObservable(
  164. OnNext(110, 1),
  165. OnNext(180, 2),
  166. OnNext(230, 3),
  167. OnNext(270, 4),
  168. OnNext(340, 5),
  169. OnNext(380, 6),
  170. OnNext(390, 7),
  171. OnNext(450, 8),
  172. OnNext(470, 9),
  173. OnNext(560, 10),
  174. OnNext(580, 11),
  175. OnCompleted<int>(600)
  176. );
  177. var res = scheduler.Start(() =>
  178. xs.Where(x =>
  179. {
  180. invoked++;
  181. return IsPrime(x);
  182. }),
  183. 400
  184. );
  185. res.Messages.AssertEqual(
  186. OnNext(230, 3),
  187. OnNext(340, 5),
  188. OnNext(390, 7)
  189. );
  190. xs.Subscriptions.AssertEqual(
  191. Subscribe(200, 400)
  192. );
  193. Assert.Equal(5, invoked);
  194. }
  195. [Fact]
  196. public void Where_Error()
  197. {
  198. var scheduler = new TestScheduler();
  199. var invoked = 0;
  200. var ex = new Exception();
  201. var xs = scheduler.CreateHotObservable(
  202. OnNext(110, 1),
  203. OnNext(180, 2),
  204. OnNext(230, 3),
  205. OnNext(270, 4),
  206. OnNext(340, 5),
  207. OnNext(380, 6),
  208. OnNext(390, 7),
  209. OnNext(450, 8),
  210. OnNext(470, 9),
  211. OnNext(560, 10),
  212. OnNext(580, 11),
  213. OnError<int>(600, ex),
  214. OnNext(610, 12),
  215. OnError<int>(620, new Exception()),
  216. OnCompleted<int>(630)
  217. );
  218. var res = scheduler.Start(() =>
  219. xs.Where(x =>
  220. {
  221. invoked++;
  222. return IsPrime(x);
  223. })
  224. );
  225. res.Messages.AssertEqual(
  226. OnNext(230, 3),
  227. OnNext(340, 5),
  228. OnNext(390, 7),
  229. OnNext(580, 11),
  230. OnError<int>(600, ex)
  231. );
  232. xs.Subscriptions.AssertEqual(
  233. Subscribe(200, 600)
  234. );
  235. Assert.Equal(9, invoked);
  236. }
  237. [Fact]
  238. public void Where_Throw()
  239. {
  240. var scheduler = new TestScheduler();
  241. var invoked = 0;
  242. var ex = new Exception();
  243. var xs = scheduler.CreateHotObservable(
  244. OnNext(110, 1),
  245. OnNext(180, 2),
  246. OnNext(230, 3),
  247. OnNext(270, 4),
  248. OnNext(340, 5),
  249. OnNext(380, 6),
  250. OnNext(390, 7),
  251. OnNext(450, 8),
  252. OnNext(470, 9),
  253. OnNext(560, 10),
  254. OnNext(580, 11),
  255. OnCompleted<int>(600),
  256. OnNext(610, 12),
  257. OnError<int>(620, new Exception()),
  258. OnCompleted<int>(630)
  259. );
  260. var res = scheduler.Start(() =>
  261. xs.Where(x =>
  262. {
  263. invoked++;
  264. if (x > 5)
  265. throw ex;
  266. return IsPrime(x);
  267. })
  268. );
  269. res.Messages.AssertEqual(
  270. OnNext(230, 3),
  271. OnNext(340, 5),
  272. OnError<int>(380, ex)
  273. );
  274. xs.Subscriptions.AssertEqual(
  275. Subscribe(200, 380)
  276. );
  277. Assert.Equal(4, invoked);
  278. }
  279. [Fact]
  280. public void Where_DisposeInPredicate()
  281. {
  282. var scheduler = new TestScheduler();
  283. var invoked = 0;
  284. var xs = scheduler.CreateHotObservable(
  285. OnNext(110, 1),
  286. OnNext(180, 2),
  287. OnNext(230, 3),
  288. OnNext(270, 4),
  289. OnNext(340, 5),
  290. OnNext(380, 6),
  291. OnNext(390, 7),
  292. OnNext(450, 8),
  293. OnNext(470, 9),
  294. OnNext(560, 10),
  295. OnNext(580, 11),
  296. OnCompleted<int>(600),
  297. OnNext(610, 12),
  298. OnError<int>(620, new Exception()),
  299. OnCompleted<int>(630)
  300. );
  301. var res = scheduler.CreateObserver<int>();
  302. var d = new SerialDisposable();
  303. var ys = default(IObservable<int>);
  304. scheduler.ScheduleAbsolute(Created, () => ys = xs.Where(x =>
  305. {
  306. invoked++;
  307. if (x == 8)
  308. d.Dispose();
  309. return IsPrime(x);
  310. }));
  311. scheduler.ScheduleAbsolute(Subscribed, () => d.Disposable = ys.Subscribe(res));
  312. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  313. scheduler.Start();
  314. res.Messages.AssertEqual(
  315. OnNext(230, 3),
  316. OnNext(340, 5),
  317. OnNext(390, 7)
  318. );
  319. xs.Subscriptions.AssertEqual(
  320. Subscribe(200, 450)
  321. );
  322. Assert.Equal(6, invoked);
  323. }
  324. [Fact]
  325. public void WhereWhereOptimization_Regular()
  326. {
  327. var scheduler = new TestScheduler();
  328. var xs = scheduler.CreateHotObservable(
  329. OnNext(110, 1),
  330. OnNext(180, 2),
  331. OnNext(230, 3),
  332. OnNext(270, 4),
  333. OnNext(340, 5),
  334. OnNext(380, 6),
  335. OnNext(390, 7),
  336. OnNext(450, 8),
  337. OnNext(470, 9),
  338. OnNext(560, 10),
  339. OnNext(580, 11),
  340. OnCompleted<int>(600)
  341. );
  342. var res = scheduler.Start(() =>
  343. xs.Where(x => x > 3).Where(x => x % 2 == 0)
  344. );
  345. res.Messages.AssertEqual(
  346. OnNext(270, 4),
  347. OnNext(380, 6),
  348. OnNext(450, 8),
  349. OnNext(560, 10),
  350. OnCompleted<int>(600)
  351. );
  352. }
  353. [Fact]
  354. public void WhereWhereOptimization_SecondPredicateThrows()
  355. {
  356. var scheduler = new TestScheduler();
  357. var xs = scheduler.CreateHotObservable(
  358. OnNext(110, 1),
  359. OnNext(180, 2),
  360. OnNext(230, 3),
  361. OnNext(270, 4),
  362. OnNext(340, 5),
  363. OnNext(380, 6),
  364. OnNext(390, 7),
  365. OnNext(450, 8),
  366. OnNext(470, 9),
  367. OnNext(560, 10),
  368. OnNext(580, 11),
  369. OnCompleted<int>(600)
  370. );
  371. var res = scheduler.Start(() =>
  372. xs.Where(x => x > 3).Where(x =>
  373. {
  374. if (x <= 3)
  375. throw new Exception();
  376. return x % 2 == 0;
  377. })
  378. );
  379. res.Messages.AssertEqual(
  380. OnNext(270, 4),
  381. OnNext(380, 6),
  382. OnNext(450, 8),
  383. OnNext(560, 10),
  384. OnCompleted<int>(600)
  385. );
  386. }
  387. [Fact]
  388. public void WhereIndex_ArgumentChecking()
  389. {
  390. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).Where<int>(DummyFunc<int, int, bool>.Instance));
  391. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Where<int>((Func<int, int, bool>)null));
  392. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Where<int>(DummyFunc<int, int, bool>.Instance).Subscribe(null));
  393. }
  394. [Fact]
  395. public void WhereIndex_Complete()
  396. {
  397. var scheduler = new TestScheduler();
  398. var invoked = 0;
  399. var xs = scheduler.CreateHotObservable(
  400. OnNext(110, 1),
  401. OnNext(180, 2),
  402. OnNext(230, 3),
  403. OnNext(270, 4),
  404. OnNext(340, 5),
  405. OnNext(380, 6),
  406. OnNext(390, 7),
  407. OnNext(450, 8),
  408. OnNext(470, 9),
  409. OnNext(560, 10),
  410. OnNext(580, 11),
  411. OnCompleted<int>(600),
  412. OnNext(610, 12),
  413. OnError<int>(620, new Exception()),
  414. OnCompleted<int>(630)
  415. );
  416. var res = scheduler.Start(() =>
  417. xs.Where((x, i) =>
  418. {
  419. invoked++;
  420. return IsPrime(x + i * 10);
  421. })
  422. );
  423. res.Messages.AssertEqual(
  424. OnNext(230, 3),
  425. OnNext(390, 7),
  426. OnCompleted<int>(600)
  427. );
  428. xs.Subscriptions.AssertEqual(
  429. Subscribe(200, 600)
  430. );
  431. Assert.Equal(9, invoked);
  432. }
  433. [Fact]
  434. public void WhereIndex_True()
  435. {
  436. var scheduler = new TestScheduler();
  437. var invoked = 0;
  438. var xs = scheduler.CreateHotObservable(
  439. OnNext(110, 1),
  440. OnNext(180, 2),
  441. OnNext(230, 3),
  442. OnNext(270, 4),
  443. OnNext(340, 5),
  444. OnNext(380, 6),
  445. OnNext(390, 7),
  446. OnNext(450, 8),
  447. OnNext(470, 9),
  448. OnNext(560, 10),
  449. OnNext(580, 11),
  450. OnCompleted<int>(600)
  451. );
  452. var res = scheduler.Start(() =>
  453. xs.Where((x, i) =>
  454. {
  455. invoked++;
  456. return true;
  457. })
  458. );
  459. res.Messages.AssertEqual(
  460. OnNext(230, 3),
  461. OnNext(270, 4),
  462. OnNext(340, 5),
  463. OnNext(380, 6),
  464. OnNext(390, 7),
  465. OnNext(450, 8),
  466. OnNext(470, 9),
  467. OnNext(560, 10),
  468. OnNext(580, 11),
  469. OnCompleted<int>(600)
  470. );
  471. xs.Subscriptions.AssertEqual(
  472. Subscribe(200, 600)
  473. );
  474. Assert.Equal(9, invoked);
  475. }
  476. [Fact]
  477. public void WhereIndex_False()
  478. {
  479. var scheduler = new TestScheduler();
  480. var invoked = 0;
  481. var xs = scheduler.CreateHotObservable(
  482. OnNext(110, 1),
  483. OnNext(180, 2),
  484. OnNext(230, 3),
  485. OnNext(270, 4),
  486. OnNext(340, 5),
  487. OnNext(380, 6),
  488. OnNext(390, 7),
  489. OnNext(450, 8),
  490. OnNext(470, 9),
  491. OnNext(560, 10),
  492. OnNext(580, 11),
  493. OnCompleted<int>(600)
  494. );
  495. var res = scheduler.Start(() =>
  496. xs.Where((x, i) =>
  497. {
  498. invoked++;
  499. return false;
  500. })
  501. );
  502. res.Messages.AssertEqual(
  503. OnCompleted<int>(600)
  504. );
  505. xs.Subscriptions.AssertEqual(
  506. Subscribe(200, 600)
  507. );
  508. Assert.Equal(9, invoked);
  509. }
  510. [Fact]
  511. public void WhereIndex_Dispose()
  512. {
  513. var scheduler = new TestScheduler();
  514. var invoked = 0;
  515. var xs = scheduler.CreateHotObservable(
  516. OnNext(110, 1),
  517. OnNext(180, 2),
  518. OnNext(230, 3),
  519. OnNext(270, 4),
  520. OnNext(340, 5),
  521. OnNext(380, 6),
  522. OnNext(390, 7),
  523. OnNext(450, 8),
  524. OnNext(470, 9),
  525. OnNext(560, 10),
  526. OnNext(580, 11),
  527. OnCompleted<int>(600)
  528. );
  529. var res = scheduler.Start(() =>
  530. xs.Where((x, i) =>
  531. {
  532. invoked++;
  533. return IsPrime(x + i * 10);
  534. }),
  535. 400
  536. );
  537. res.Messages.AssertEqual(
  538. OnNext(230, 3),
  539. OnNext(390, 7)
  540. );
  541. xs.Subscriptions.AssertEqual(
  542. Subscribe(200, 400)
  543. );
  544. Assert.Equal(5, invoked);
  545. }
  546. [Fact]
  547. public void WhereIndex_Error()
  548. {
  549. var scheduler = new TestScheduler();
  550. var invoked = 0;
  551. var ex = new Exception();
  552. var xs = scheduler.CreateHotObservable(
  553. OnNext(110, 1),
  554. OnNext(180, 2),
  555. OnNext(230, 3),
  556. OnNext(270, 4),
  557. OnNext(340, 5),
  558. OnNext(380, 6),
  559. OnNext(390, 7),
  560. OnNext(450, 8),
  561. OnNext(470, 9),
  562. OnNext(560, 10),
  563. OnNext(580, 11),
  564. OnError<int>(600, ex),
  565. OnNext(610, 12),
  566. OnError<int>(620, new Exception()),
  567. OnCompleted<int>(630)
  568. );
  569. var res = scheduler.Start(() =>
  570. xs.Where((x, i) =>
  571. {
  572. invoked++;
  573. return IsPrime(x + i * 10);
  574. })
  575. );
  576. res.Messages.AssertEqual(
  577. OnNext(230, 3),
  578. OnNext(390, 7),
  579. OnError<int>(600, ex)
  580. );
  581. xs.Subscriptions.AssertEqual(
  582. Subscribe(200, 600)
  583. );
  584. Assert.Equal(9, invoked);
  585. }
  586. [Fact]
  587. public void WhereIndex_Throw()
  588. {
  589. var scheduler = new TestScheduler();
  590. var invoked = 0;
  591. var ex = new Exception();
  592. var xs = scheduler.CreateHotObservable(
  593. OnNext(110, 1),
  594. OnNext(180, 2),
  595. OnNext(230, 3),
  596. OnNext(270, 4),
  597. OnNext(340, 5),
  598. OnNext(380, 6),
  599. OnNext(390, 7),
  600. OnNext(450, 8),
  601. OnNext(470, 9),
  602. OnNext(560, 10),
  603. OnNext(580, 11),
  604. OnCompleted<int>(600),
  605. OnNext(610, 12),
  606. OnError<int>(620, new Exception()),
  607. OnCompleted<int>(630)
  608. );
  609. var res = scheduler.Start(() =>
  610. xs.Where((x, i) =>
  611. {
  612. invoked++;
  613. if (x > 5)
  614. throw ex;
  615. return IsPrime(x + i * 10);
  616. })
  617. );
  618. res.Messages.AssertEqual(
  619. OnNext(230, 3),
  620. OnError<int>(380, ex)
  621. );
  622. xs.Subscriptions.AssertEqual(
  623. Subscribe(200, 380)
  624. );
  625. Assert.Equal(4, invoked);
  626. }
  627. [Fact]
  628. public void WhereIndex_DisposeInPredicate()
  629. {
  630. var scheduler = new TestScheduler();
  631. var invoked = 0;
  632. var xs = scheduler.CreateHotObservable(
  633. OnNext(110, 1),
  634. OnNext(180, 2),
  635. OnNext(230, 3),
  636. OnNext(270, 4),
  637. OnNext(340, 5),
  638. OnNext(380, 6),
  639. OnNext(390, 7),
  640. OnNext(450, 8),
  641. OnNext(470, 9),
  642. OnNext(560, 10),
  643. OnNext(580, 11),
  644. OnCompleted<int>(600),
  645. OnNext(610, 12),
  646. OnError<int>(620, new Exception()),
  647. OnCompleted<int>(630)
  648. );
  649. var res = scheduler.CreateObserver<int>();
  650. var d = new SerialDisposable();
  651. var ys = default(IObservable<int>);
  652. scheduler.ScheduleAbsolute(Created, () => ys = xs.Where((x, i) =>
  653. {
  654. invoked++;
  655. if (x == 8)
  656. d.Dispose();
  657. return IsPrime(x + i * 10);
  658. }));
  659. scheduler.ScheduleAbsolute(Subscribed, () => d.Disposable = ys.Subscribe(res));
  660. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  661. scheduler.Start();
  662. res.Messages.AssertEqual(
  663. OnNext(230, 3),
  664. OnNext(390, 7)
  665. );
  666. xs.Subscriptions.AssertEqual(
  667. Subscribe(200, 450)
  668. );
  669. Assert.Equal(6, invoked);
  670. }
  671. [Fact]
  672. public void Where_Where1()
  673. {
  674. var scheduler = new TestScheduler();
  675. var xs = scheduler.CreateHotObservable(
  676. OnNext(110, 1),
  677. OnNext(180, 2),
  678. OnNext(230, 3),
  679. OnNext(270, 4),
  680. OnNext(340, 5),
  681. OnNext(380, 6),
  682. OnNext(390, 7),
  683. OnCompleted<int>(400)
  684. );
  685. var res = scheduler.Start(() =>
  686. xs.Where(x => x > 3).Where(x => x < 6)
  687. );
  688. res.Messages.AssertEqual(
  689. OnNext(270, 4),
  690. OnNext(340, 5),
  691. OnCompleted<int>(400)
  692. );
  693. xs.Subscriptions.AssertEqual(
  694. Subscribe(200, 400)
  695. );
  696. }
  697. [Fact]
  698. public void Where_Where2()
  699. {
  700. var scheduler = new TestScheduler();
  701. var xs = scheduler.CreateHotObservable(
  702. OnNext(110, 1),
  703. OnNext(180, 2),
  704. OnNext(230, 3),
  705. OnNext(270, 4),
  706. OnNext(340, 5),
  707. OnNext(380, 6),
  708. OnNext(390, 7),
  709. OnCompleted<int>(400)
  710. );
  711. var res = scheduler.Start(() =>
  712. xs.Where((x, i) => i >= 1).Where(x => x < 6)
  713. );
  714. res.Messages.AssertEqual(
  715. OnNext(270, 4),
  716. OnNext(340, 5),
  717. OnCompleted<int>(400)
  718. );
  719. xs.Subscriptions.AssertEqual(
  720. Subscribe(200, 400)
  721. );
  722. }
  723. [Fact]
  724. public void Where_Where3()
  725. {
  726. var scheduler = new TestScheduler();
  727. var xs = scheduler.CreateHotObservable(
  728. OnNext(110, 1),
  729. OnNext(180, 2),
  730. OnNext(230, 3),
  731. OnNext(270, 4),
  732. OnNext(340, 5),
  733. OnNext(380, 6),
  734. OnNext(390, 7),
  735. OnCompleted<int>(400)
  736. );
  737. var res = scheduler.Start(() =>
  738. xs.Where(x => x > 3).Where((x, i) => i < 2)
  739. );
  740. res.Messages.AssertEqual(
  741. OnNext(270, 4),
  742. OnNext(340, 5),
  743. OnCompleted<int>(400)
  744. );
  745. xs.Subscriptions.AssertEqual(
  746. Subscribe(200, 400)
  747. );
  748. }
  749. [Fact]
  750. public void Where_Where4()
  751. {
  752. var scheduler = new TestScheduler();
  753. var xs = scheduler.CreateHotObservable(
  754. OnNext(110, 1),
  755. OnNext(180, 2),
  756. OnNext(230, 3),
  757. OnNext(270, 4),
  758. OnNext(340, 5),
  759. OnNext(380, 6),
  760. OnNext(390, 7),
  761. OnCompleted<int>(400)
  762. );
  763. var res = scheduler.Start(() =>
  764. xs.Where((x, i) => i >= 1).Where((x, i) => i < 2)
  765. );
  766. res.Messages.AssertEqual(
  767. OnNext(270, 4),
  768. OnNext(340, 5),
  769. OnCompleted<int>(400)
  770. );
  771. xs.Subscriptions.AssertEqual(
  772. Subscribe(200, 400)
  773. );
  774. }
  775. }
  776. }