WhereTest.cs 25 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Xunit;
  12. namespace ReactiveTests.Tests
  13. {
  14. public class WhereTest : ReactiveTest
  15. {
  16. [Fact]
  17. public void Where_ArgumentChecking()
  18. {
  19. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).Where(DummyFunc<int, bool>.Instance));
  20. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Where((Func<int, bool>)null));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Where(DummyFunc<int, bool>.Instance).Subscribe(null));
  22. }
  23. private static bool IsPrime(int i)
  24. {
  25. if (i <= 1)
  26. {
  27. return false;
  28. }
  29. var max = (int)Math.Sqrt(i);
  30. for (var j = 2; j <= max; ++j)
  31. {
  32. if (i % j == 0)
  33. {
  34. return false;
  35. }
  36. }
  37. return true;
  38. }
  39. [Fact]
  40. public void Where_Complete()
  41. {
  42. var scheduler = new TestScheduler();
  43. var invoked = 0;
  44. var xs = scheduler.CreateHotObservable(
  45. OnNext(110, 1),
  46. OnNext(180, 2),
  47. OnNext(230, 3),
  48. OnNext(270, 4),
  49. OnNext(340, 5),
  50. OnNext(380, 6),
  51. OnNext(390, 7),
  52. OnNext(450, 8),
  53. OnNext(470, 9),
  54. OnNext(560, 10),
  55. OnNext(580, 11),
  56. OnCompleted<int>(600),
  57. OnNext(610, 12),
  58. OnError<int>(620, new Exception()),
  59. OnCompleted<int>(630)
  60. );
  61. var res = scheduler.Start(() =>
  62. xs.Where(x =>
  63. {
  64. invoked++;
  65. return IsPrime(x);
  66. })
  67. );
  68. res.Messages.AssertEqual(
  69. OnNext(230, 3),
  70. OnNext(340, 5),
  71. OnNext(390, 7),
  72. OnNext(580, 11),
  73. OnCompleted<int>(600)
  74. );
  75. xs.Subscriptions.AssertEqual(
  76. Subscribe(200, 600)
  77. );
  78. Assert.Equal(9, invoked);
  79. }
  80. [Fact]
  81. public void Where_True()
  82. {
  83. var scheduler = new TestScheduler();
  84. var invoked = 0;
  85. var xs = scheduler.CreateHotObservable(
  86. OnNext(110, 1),
  87. OnNext(180, 2),
  88. OnNext(230, 3),
  89. OnNext(270, 4),
  90. OnNext(340, 5),
  91. OnNext(380, 6),
  92. OnNext(390, 7),
  93. OnNext(450, 8),
  94. OnNext(470, 9),
  95. OnNext(560, 10),
  96. OnNext(580, 11),
  97. OnCompleted<int>(600)
  98. );
  99. var res = scheduler.Start(() =>
  100. xs.Where(x =>
  101. {
  102. invoked++;
  103. return true;
  104. })
  105. );
  106. res.Messages.AssertEqual(
  107. OnNext(230, 3),
  108. OnNext(270, 4),
  109. OnNext(340, 5),
  110. OnNext(380, 6),
  111. OnNext(390, 7),
  112. OnNext(450, 8),
  113. OnNext(470, 9),
  114. OnNext(560, 10),
  115. OnNext(580, 11),
  116. OnCompleted<int>(600)
  117. );
  118. xs.Subscriptions.AssertEqual(
  119. Subscribe(200, 600)
  120. );
  121. Assert.Equal(9, invoked);
  122. }
  123. [Fact]
  124. public void Where_False()
  125. {
  126. var scheduler = new TestScheduler();
  127. var invoked = 0;
  128. var xs = scheduler.CreateHotObservable(
  129. OnNext(110, 1),
  130. OnNext(180, 2),
  131. OnNext(230, 3),
  132. OnNext(270, 4),
  133. OnNext(340, 5),
  134. OnNext(380, 6),
  135. OnNext(390, 7),
  136. OnNext(450, 8),
  137. OnNext(470, 9),
  138. OnNext(560, 10),
  139. OnNext(580, 11),
  140. OnCompleted<int>(600)
  141. );
  142. var res = scheduler.Start(() =>
  143. xs.Where(x =>
  144. {
  145. invoked++;
  146. return false;
  147. })
  148. );
  149. res.Messages.AssertEqual(
  150. OnCompleted<int>(600)
  151. );
  152. xs.Subscriptions.AssertEqual(
  153. Subscribe(200, 600)
  154. );
  155. Assert.Equal(9, invoked);
  156. }
  157. [Fact]
  158. public void Where_Dispose()
  159. {
  160. var scheduler = new TestScheduler();
  161. var invoked = 0;
  162. var xs = scheduler.CreateHotObservable(
  163. OnNext(110, 1),
  164. OnNext(180, 2),
  165. OnNext(230, 3),
  166. OnNext(270, 4),
  167. OnNext(340, 5),
  168. OnNext(380, 6),
  169. OnNext(390, 7),
  170. OnNext(450, 8),
  171. OnNext(470, 9),
  172. OnNext(560, 10),
  173. OnNext(580, 11),
  174. OnCompleted<int>(600)
  175. );
  176. var res = scheduler.Start(() =>
  177. xs.Where(x =>
  178. {
  179. invoked++;
  180. return IsPrime(x);
  181. }),
  182. 400
  183. );
  184. res.Messages.AssertEqual(
  185. OnNext(230, 3),
  186. OnNext(340, 5),
  187. OnNext(390, 7)
  188. );
  189. xs.Subscriptions.AssertEqual(
  190. Subscribe(200, 400)
  191. );
  192. Assert.Equal(5, invoked);
  193. }
  194. [Fact]
  195. public void Where_Error()
  196. {
  197. var scheduler = new TestScheduler();
  198. var invoked = 0;
  199. var ex = new Exception();
  200. var xs = scheduler.CreateHotObservable(
  201. OnNext(110, 1),
  202. OnNext(180, 2),
  203. OnNext(230, 3),
  204. OnNext(270, 4),
  205. OnNext(340, 5),
  206. OnNext(380, 6),
  207. OnNext(390, 7),
  208. OnNext(450, 8),
  209. OnNext(470, 9),
  210. OnNext(560, 10),
  211. OnNext(580, 11),
  212. OnError<int>(600, ex),
  213. OnNext(610, 12),
  214. OnError<int>(620, new Exception()),
  215. OnCompleted<int>(630)
  216. );
  217. var res = scheduler.Start(() =>
  218. xs.Where(x =>
  219. {
  220. invoked++;
  221. return IsPrime(x);
  222. })
  223. );
  224. res.Messages.AssertEqual(
  225. OnNext(230, 3),
  226. OnNext(340, 5),
  227. OnNext(390, 7),
  228. OnNext(580, 11),
  229. OnError<int>(600, ex)
  230. );
  231. xs.Subscriptions.AssertEqual(
  232. Subscribe(200, 600)
  233. );
  234. Assert.Equal(9, invoked);
  235. }
  236. [Fact]
  237. public void Where_Throw()
  238. {
  239. var scheduler = new TestScheduler();
  240. var invoked = 0;
  241. var ex = new Exception();
  242. var xs = scheduler.CreateHotObservable(
  243. OnNext(110, 1),
  244. OnNext(180, 2),
  245. OnNext(230, 3),
  246. OnNext(270, 4),
  247. OnNext(340, 5),
  248. OnNext(380, 6),
  249. OnNext(390, 7),
  250. OnNext(450, 8),
  251. OnNext(470, 9),
  252. OnNext(560, 10),
  253. OnNext(580, 11),
  254. OnCompleted<int>(600),
  255. OnNext(610, 12),
  256. OnError<int>(620, new Exception()),
  257. OnCompleted<int>(630)
  258. );
  259. var res = scheduler.Start(() =>
  260. xs.Where(x =>
  261. {
  262. invoked++;
  263. if (x > 5)
  264. {
  265. throw ex;
  266. }
  267. return IsPrime(x);
  268. })
  269. );
  270. res.Messages.AssertEqual(
  271. OnNext(230, 3),
  272. OnNext(340, 5),
  273. OnError<int>(380, ex)
  274. );
  275. xs.Subscriptions.AssertEqual(
  276. Subscribe(200, 380)
  277. );
  278. Assert.Equal(4, invoked);
  279. }
  280. [Fact]
  281. public void Where_DisposeInPredicate()
  282. {
  283. var scheduler = new TestScheduler();
  284. var invoked = 0;
  285. var xs = scheduler.CreateHotObservable(
  286. OnNext(110, 1),
  287. OnNext(180, 2),
  288. OnNext(230, 3),
  289. OnNext(270, 4),
  290. OnNext(340, 5),
  291. OnNext(380, 6),
  292. OnNext(390, 7),
  293. OnNext(450, 8),
  294. OnNext(470, 9),
  295. OnNext(560, 10),
  296. OnNext(580, 11),
  297. OnCompleted<int>(600),
  298. OnNext(610, 12),
  299. OnError<int>(620, new Exception()),
  300. OnCompleted<int>(630)
  301. );
  302. var res = scheduler.CreateObserver<int>();
  303. var d = new SerialDisposable();
  304. var ys = default(IObservable<int>);
  305. scheduler.ScheduleAbsolute(Created, () => ys = xs.Where(x =>
  306. {
  307. invoked++;
  308. if (x == 8)
  309. {
  310. d.Dispose();
  311. }
  312. return IsPrime(x);
  313. }));
  314. scheduler.ScheduleAbsolute(Subscribed, () => d.Disposable = ys.Subscribe(res));
  315. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  316. scheduler.Start();
  317. res.Messages.AssertEqual(
  318. OnNext(230, 3),
  319. OnNext(340, 5),
  320. OnNext(390, 7)
  321. );
  322. xs.Subscriptions.AssertEqual(
  323. Subscribe(200, 450)
  324. );
  325. Assert.Equal(6, invoked);
  326. }
  327. [Fact]
  328. public void WhereWhereOptimization_Regular()
  329. {
  330. var scheduler = new TestScheduler();
  331. var xs = scheduler.CreateHotObservable(
  332. OnNext(110, 1),
  333. OnNext(180, 2),
  334. OnNext(230, 3),
  335. OnNext(270, 4),
  336. OnNext(340, 5),
  337. OnNext(380, 6),
  338. OnNext(390, 7),
  339. OnNext(450, 8),
  340. OnNext(470, 9),
  341. OnNext(560, 10),
  342. OnNext(580, 11),
  343. OnCompleted<int>(600)
  344. );
  345. var res = scheduler.Start(() =>
  346. xs.Where(x => x > 3).Where(x => x % 2 == 0)
  347. );
  348. res.Messages.AssertEqual(
  349. OnNext(270, 4),
  350. OnNext(380, 6),
  351. OnNext(450, 8),
  352. OnNext(560, 10),
  353. OnCompleted<int>(600)
  354. );
  355. }
  356. [Fact]
  357. public void WhereWhereOptimization_SecondPredicateThrows()
  358. {
  359. var scheduler = new TestScheduler();
  360. var xs = scheduler.CreateHotObservable(
  361. OnNext(110, 1),
  362. OnNext(180, 2),
  363. OnNext(230, 3),
  364. OnNext(270, 4),
  365. OnNext(340, 5),
  366. OnNext(380, 6),
  367. OnNext(390, 7),
  368. OnNext(450, 8),
  369. OnNext(470, 9),
  370. OnNext(560, 10),
  371. OnNext(580, 11),
  372. OnCompleted<int>(600)
  373. );
  374. var res = scheduler.Start(() =>
  375. xs.Where(x => x > 3).Where(x =>
  376. {
  377. if (x <= 3)
  378. {
  379. throw new Exception();
  380. }
  381. return x % 2 == 0;
  382. })
  383. );
  384. res.Messages.AssertEqual(
  385. OnNext(270, 4),
  386. OnNext(380, 6),
  387. OnNext(450, 8),
  388. OnNext(560, 10),
  389. OnCompleted<int>(600)
  390. );
  391. }
  392. [Fact]
  393. public void WhereIndex_ArgumentChecking()
  394. {
  395. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).Where(DummyFunc<int, int, bool>.Instance));
  396. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Where((Func<int, int, bool>)null));
  397. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Where(DummyFunc<int, int, bool>.Instance).Subscribe(null));
  398. }
  399. [Fact]
  400. public void WhereIndex_Complete()
  401. {
  402. var scheduler = new TestScheduler();
  403. var invoked = 0;
  404. var xs = scheduler.CreateHotObservable(
  405. OnNext(110, 1),
  406. OnNext(180, 2),
  407. OnNext(230, 3),
  408. OnNext(270, 4),
  409. OnNext(340, 5),
  410. OnNext(380, 6),
  411. OnNext(390, 7),
  412. OnNext(450, 8),
  413. OnNext(470, 9),
  414. OnNext(560, 10),
  415. OnNext(580, 11),
  416. OnCompleted<int>(600),
  417. OnNext(610, 12),
  418. OnError<int>(620, new Exception()),
  419. OnCompleted<int>(630)
  420. );
  421. var res = scheduler.Start(() =>
  422. xs.Where((x, i) =>
  423. {
  424. invoked++;
  425. return IsPrime(x + i * 10);
  426. })
  427. );
  428. res.Messages.AssertEqual(
  429. OnNext(230, 3),
  430. OnNext(390, 7),
  431. OnCompleted<int>(600)
  432. );
  433. xs.Subscriptions.AssertEqual(
  434. Subscribe(200, 600)
  435. );
  436. Assert.Equal(9, invoked);
  437. }
  438. [Fact]
  439. public void WhereIndex_True()
  440. {
  441. var scheduler = new TestScheduler();
  442. var invoked = 0;
  443. var xs = scheduler.CreateHotObservable(
  444. OnNext(110, 1),
  445. OnNext(180, 2),
  446. OnNext(230, 3),
  447. OnNext(270, 4),
  448. OnNext(340, 5),
  449. OnNext(380, 6),
  450. OnNext(390, 7),
  451. OnNext(450, 8),
  452. OnNext(470, 9),
  453. OnNext(560, 10),
  454. OnNext(580, 11),
  455. OnCompleted<int>(600)
  456. );
  457. var res = scheduler.Start(() =>
  458. xs.Where((x, i) =>
  459. {
  460. invoked++;
  461. return true;
  462. })
  463. );
  464. res.Messages.AssertEqual(
  465. OnNext(230, 3),
  466. OnNext(270, 4),
  467. OnNext(340, 5),
  468. OnNext(380, 6),
  469. OnNext(390, 7),
  470. OnNext(450, 8),
  471. OnNext(470, 9),
  472. OnNext(560, 10),
  473. OnNext(580, 11),
  474. OnCompleted<int>(600)
  475. );
  476. xs.Subscriptions.AssertEqual(
  477. Subscribe(200, 600)
  478. );
  479. Assert.Equal(9, invoked);
  480. }
  481. [Fact]
  482. public void WhereIndex_False()
  483. {
  484. var scheduler = new TestScheduler();
  485. var invoked = 0;
  486. var xs = scheduler.CreateHotObservable(
  487. OnNext(110, 1),
  488. OnNext(180, 2),
  489. OnNext(230, 3),
  490. OnNext(270, 4),
  491. OnNext(340, 5),
  492. OnNext(380, 6),
  493. OnNext(390, 7),
  494. OnNext(450, 8),
  495. OnNext(470, 9),
  496. OnNext(560, 10),
  497. OnNext(580, 11),
  498. OnCompleted<int>(600)
  499. );
  500. var res = scheduler.Start(() =>
  501. xs.Where((x, i) =>
  502. {
  503. invoked++;
  504. return false;
  505. })
  506. );
  507. res.Messages.AssertEqual(
  508. OnCompleted<int>(600)
  509. );
  510. xs.Subscriptions.AssertEqual(
  511. Subscribe(200, 600)
  512. );
  513. Assert.Equal(9, invoked);
  514. }
  515. [Fact]
  516. public void WhereIndex_Dispose()
  517. {
  518. var scheduler = new TestScheduler();
  519. var invoked = 0;
  520. var xs = scheduler.CreateHotObservable(
  521. OnNext(110, 1),
  522. OnNext(180, 2),
  523. OnNext(230, 3),
  524. OnNext(270, 4),
  525. OnNext(340, 5),
  526. OnNext(380, 6),
  527. OnNext(390, 7),
  528. OnNext(450, 8),
  529. OnNext(470, 9),
  530. OnNext(560, 10),
  531. OnNext(580, 11),
  532. OnCompleted<int>(600)
  533. );
  534. var res = scheduler.Start(() =>
  535. xs.Where((x, i) =>
  536. {
  537. invoked++;
  538. return IsPrime(x + i * 10);
  539. }),
  540. 400
  541. );
  542. res.Messages.AssertEqual(
  543. OnNext(230, 3),
  544. OnNext(390, 7)
  545. );
  546. xs.Subscriptions.AssertEqual(
  547. Subscribe(200, 400)
  548. );
  549. Assert.Equal(5, invoked);
  550. }
  551. [Fact]
  552. public void WhereIndex_Error()
  553. {
  554. var scheduler = new TestScheduler();
  555. var invoked = 0;
  556. var ex = new Exception();
  557. var xs = scheduler.CreateHotObservable(
  558. OnNext(110, 1),
  559. OnNext(180, 2),
  560. OnNext(230, 3),
  561. OnNext(270, 4),
  562. OnNext(340, 5),
  563. OnNext(380, 6),
  564. OnNext(390, 7),
  565. OnNext(450, 8),
  566. OnNext(470, 9),
  567. OnNext(560, 10),
  568. OnNext(580, 11),
  569. OnError<int>(600, ex),
  570. OnNext(610, 12),
  571. OnError<int>(620, new Exception()),
  572. OnCompleted<int>(630)
  573. );
  574. var res = scheduler.Start(() =>
  575. xs.Where((x, i) =>
  576. {
  577. invoked++;
  578. return IsPrime(x + i * 10);
  579. })
  580. );
  581. res.Messages.AssertEqual(
  582. OnNext(230, 3),
  583. OnNext(390, 7),
  584. OnError<int>(600, ex)
  585. );
  586. xs.Subscriptions.AssertEqual(
  587. Subscribe(200, 600)
  588. );
  589. Assert.Equal(9, invoked);
  590. }
  591. [Fact]
  592. public void WhereIndex_Throw()
  593. {
  594. var scheduler = new TestScheduler();
  595. var invoked = 0;
  596. var ex = new Exception();
  597. var xs = scheduler.CreateHotObservable(
  598. OnNext(110, 1),
  599. OnNext(180, 2),
  600. OnNext(230, 3),
  601. OnNext(270, 4),
  602. OnNext(340, 5),
  603. OnNext(380, 6),
  604. OnNext(390, 7),
  605. OnNext(450, 8),
  606. OnNext(470, 9),
  607. OnNext(560, 10),
  608. OnNext(580, 11),
  609. OnCompleted<int>(600),
  610. OnNext(610, 12),
  611. OnError<int>(620, new Exception()),
  612. OnCompleted<int>(630)
  613. );
  614. var res = scheduler.Start(() =>
  615. xs.Where((x, i) =>
  616. {
  617. invoked++;
  618. if (x > 5)
  619. {
  620. throw ex;
  621. }
  622. return IsPrime(x + i * 10);
  623. })
  624. );
  625. res.Messages.AssertEqual(
  626. OnNext(230, 3),
  627. OnError<int>(380, ex)
  628. );
  629. xs.Subscriptions.AssertEqual(
  630. Subscribe(200, 380)
  631. );
  632. Assert.Equal(4, invoked);
  633. }
  634. [Fact]
  635. public void WhereIndex_DisposeInPredicate()
  636. {
  637. var scheduler = new TestScheduler();
  638. var invoked = 0;
  639. var xs = scheduler.CreateHotObservable(
  640. OnNext(110, 1),
  641. OnNext(180, 2),
  642. OnNext(230, 3),
  643. OnNext(270, 4),
  644. OnNext(340, 5),
  645. OnNext(380, 6),
  646. OnNext(390, 7),
  647. OnNext(450, 8),
  648. OnNext(470, 9),
  649. OnNext(560, 10),
  650. OnNext(580, 11),
  651. OnCompleted<int>(600),
  652. OnNext(610, 12),
  653. OnError<int>(620, new Exception()),
  654. OnCompleted<int>(630)
  655. );
  656. var res = scheduler.CreateObserver<int>();
  657. var d = new SerialDisposable();
  658. var ys = default(IObservable<int>);
  659. scheduler.ScheduleAbsolute(Created, () => ys = xs.Where((x, i) =>
  660. {
  661. invoked++;
  662. if (x == 8)
  663. {
  664. d.Dispose();
  665. }
  666. return IsPrime(x + i * 10);
  667. }));
  668. scheduler.ScheduleAbsolute(Subscribed, () => d.Disposable = ys.Subscribe(res));
  669. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  670. scheduler.Start();
  671. res.Messages.AssertEqual(
  672. OnNext(230, 3),
  673. OnNext(390, 7)
  674. );
  675. xs.Subscriptions.AssertEqual(
  676. Subscribe(200, 450)
  677. );
  678. Assert.Equal(6, invoked);
  679. }
  680. [Fact]
  681. public void Where_Where1()
  682. {
  683. var scheduler = new TestScheduler();
  684. var xs = scheduler.CreateHotObservable(
  685. OnNext(110, 1),
  686. OnNext(180, 2),
  687. OnNext(230, 3),
  688. OnNext(270, 4),
  689. OnNext(340, 5),
  690. OnNext(380, 6),
  691. OnNext(390, 7),
  692. OnCompleted<int>(400)
  693. );
  694. var res = scheduler.Start(() =>
  695. xs.Where(x => x > 3).Where(x => x < 6)
  696. );
  697. res.Messages.AssertEqual(
  698. OnNext(270, 4),
  699. OnNext(340, 5),
  700. OnCompleted<int>(400)
  701. );
  702. xs.Subscriptions.AssertEqual(
  703. Subscribe(200, 400)
  704. );
  705. }
  706. [Fact]
  707. public void Where_Where2()
  708. {
  709. var scheduler = new TestScheduler();
  710. var xs = scheduler.CreateHotObservable(
  711. OnNext(110, 1),
  712. OnNext(180, 2),
  713. OnNext(230, 3),
  714. OnNext(270, 4),
  715. OnNext(340, 5),
  716. OnNext(380, 6),
  717. OnNext(390, 7),
  718. OnCompleted<int>(400)
  719. );
  720. var res = scheduler.Start(() =>
  721. xs.Where((x, i) => i >= 1).Where(x => x < 6)
  722. );
  723. res.Messages.AssertEqual(
  724. OnNext(270, 4),
  725. OnNext(340, 5),
  726. OnCompleted<int>(400)
  727. );
  728. xs.Subscriptions.AssertEqual(
  729. Subscribe(200, 400)
  730. );
  731. }
  732. [Fact]
  733. public void Where_Where3()
  734. {
  735. var scheduler = new TestScheduler();
  736. var xs = scheduler.CreateHotObservable(
  737. OnNext(110, 1),
  738. OnNext(180, 2),
  739. OnNext(230, 3),
  740. OnNext(270, 4),
  741. OnNext(340, 5),
  742. OnNext(380, 6),
  743. OnNext(390, 7),
  744. OnCompleted<int>(400)
  745. );
  746. var res = scheduler.Start(() =>
  747. xs.Where(x => x > 3).Where((x, i) => i < 2)
  748. );
  749. res.Messages.AssertEqual(
  750. OnNext(270, 4),
  751. OnNext(340, 5),
  752. OnCompleted<int>(400)
  753. );
  754. xs.Subscriptions.AssertEqual(
  755. Subscribe(200, 400)
  756. );
  757. }
  758. [Fact]
  759. public void Where_Where4()
  760. {
  761. var scheduler = new TestScheduler();
  762. var xs = scheduler.CreateHotObservable(
  763. OnNext(110, 1),
  764. OnNext(180, 2),
  765. OnNext(230, 3),
  766. OnNext(270, 4),
  767. OnNext(340, 5),
  768. OnNext(380, 6),
  769. OnNext(390, 7),
  770. OnCompleted<int>(400)
  771. );
  772. var res = scheduler.Start(() =>
  773. xs.Where((x, i) => i >= 1).Where((x, i) => i < 2)
  774. );
  775. res.Messages.AssertEqual(
  776. OnNext(270, 4),
  777. OnNext(340, 5),
  778. OnCompleted<int>(400)
  779. );
  780. xs.Subscriptions.AssertEqual(
  781. Subscribe(200, 400)
  782. );
  783. }
  784. }
  785. }