WindowTest.cs 57 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. using Assert = Xunit.Assert;
  13. namespace ReactiveTests.Tests
  14. {
  15. [TestClass]
  16. public class WindowTest : ReactiveTest
  17. {
  18. #region + Observable +
  19. [TestMethod]
  20. public void Window_ArgumentChecking()
  21. {
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), DummyFunc<IObservable<int>>.Instance));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, default(Func<IObservable<int>>)));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, default, DummyFunc<int, IObservable<int>>.Instance));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>)));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), DummyObservable<int>.Instance));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, default(IObservable<int>)));
  29. }
  30. [TestMethod]
  31. public void Window_Closings_Basic()
  32. {
  33. var scheduler = new TestScheduler();
  34. var xs = scheduler.CreateHotObservable(
  35. OnNext(90, 1),
  36. OnNext(180, 2),
  37. OnNext(250, 3),
  38. OnNext(260, 4),
  39. OnNext(310, 5),
  40. OnNext(340, 6),
  41. OnNext(410, 7),
  42. OnNext(420, 8),
  43. OnNext(470, 9),
  44. OnNext(550, 10),
  45. OnCompleted<int>(590)
  46. );
  47. var window = 1;
  48. var res = scheduler.Start(() =>
  49. xs.Window(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  50. );
  51. res.Messages.AssertEqual(
  52. OnNext(250, "0 3"),
  53. OnNext(260, "0 4"),
  54. OnNext(310, "1 5"),
  55. OnNext(340, "1 6"),
  56. OnNext(410, "1 7"),
  57. OnNext(420, "1 8"),
  58. OnNext(470, "1 9"),
  59. OnNext(550, "2 10"),
  60. OnCompleted<string>(590)
  61. );
  62. xs.Subscriptions.AssertEqual(
  63. Subscribe(200, 590)
  64. );
  65. }
  66. [TestMethod]
  67. public void Window_Closings_InnerSubscriptions()
  68. {
  69. var scheduler = new TestScheduler();
  70. var xs = scheduler.CreateHotObservable(
  71. OnNext(90, 1),
  72. OnNext(180, 2),
  73. OnNext(250, 3),
  74. OnNext(260, 4),
  75. OnNext(310, 5),
  76. OnNext(340, 6),
  77. OnNext(410, 7),
  78. OnNext(420, 8),
  79. OnNext(470, 9),
  80. OnNext(550, 10),
  81. OnCompleted<int>(590)
  82. );
  83. var closings = new ITestableObservable<bool>[] {
  84. scheduler.CreateHotObservable(
  85. OnNext(300, true),
  86. OnNext(350, false),
  87. OnCompleted<bool>(380)
  88. ),
  89. scheduler.CreateHotObservable(
  90. OnNext(400, true),
  91. OnNext(510, false),
  92. OnNext(620, false)
  93. ),
  94. scheduler.CreateHotObservable(
  95. OnCompleted<bool>(500)
  96. ),
  97. scheduler.CreateHotObservable(
  98. OnNext(600, true)
  99. )
  100. };
  101. var window = 0;
  102. var res = scheduler.Start(() =>
  103. xs.Window(() => closings[window++]).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  104. );
  105. res.Messages.AssertEqual(
  106. OnNext(250, "0 3"),
  107. OnNext(260, "0 4"),
  108. OnNext(310, "1 5"),
  109. OnNext(340, "1 6"),
  110. OnNext(410, "2 7"),
  111. OnNext(420, "2 8"),
  112. OnNext(470, "2 9"),
  113. OnNext(550, "3 10"),
  114. OnCompleted<string>(590)
  115. );
  116. xs.Subscriptions.AssertEqual(
  117. Subscribe(200, 590)
  118. );
  119. closings[0].Subscriptions.AssertEqual(
  120. Subscribe(200, 300)
  121. );
  122. closings[1].Subscriptions.AssertEqual(
  123. Subscribe(300, 400)
  124. );
  125. closings[2].Subscriptions.AssertEqual(
  126. Subscribe(400, 500)
  127. );
  128. closings[3].Subscriptions.AssertEqual(
  129. Subscribe(500, 590)
  130. );
  131. }
  132. [TestMethod]
  133. public void Window_Closings_Empty()
  134. {
  135. var scheduler = new TestScheduler();
  136. var xs = scheduler.CreateHotObservable(
  137. OnNext(90, 1),
  138. OnNext(180, 2),
  139. OnNext(250, 3),
  140. OnNext(260, 4),
  141. OnNext(310, 5),
  142. OnNext(340, 6),
  143. OnNext(410, 7),
  144. OnNext(420, 8),
  145. OnNext(470, 9),
  146. OnNext(550, 10),
  147. OnCompleted<int>(590)
  148. );
  149. var window = 1;
  150. var res = scheduler.Start(() =>
  151. xs.Window(() => Observable.Empty<int>().Delay(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  152. );
  153. res.Messages.AssertEqual(
  154. OnNext(250, "0 3"),
  155. OnNext(260, "0 4"),
  156. OnNext(310, "1 5"),
  157. OnNext(340, "1 6"),
  158. OnNext(410, "1 7"),
  159. OnNext(420, "1 8"),
  160. OnNext(470, "1 9"),
  161. OnNext(550, "2 10"),
  162. OnCompleted<string>(590)
  163. );
  164. xs.Subscriptions.AssertEqual(
  165. Subscribe(200, 590)
  166. );
  167. }
  168. [TestMethod]
  169. public void Window_Closings_Dispose()
  170. {
  171. var scheduler = new TestScheduler();
  172. var xs = scheduler.CreateHotObservable(
  173. OnNext(90, 1),
  174. OnNext(180, 2),
  175. OnNext(250, 3),
  176. OnNext(260, 4),
  177. OnNext(310, 5),
  178. OnNext(340, 6),
  179. OnNext(410, 7),
  180. OnNext(420, 8),
  181. OnNext(470, 9),
  182. OnNext(550, 10),
  183. OnCompleted<int>(590)
  184. );
  185. var window = 1;
  186. var res = scheduler.Start(() =>
  187. xs.Window(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  188. 400
  189. );
  190. res.Messages.AssertEqual(
  191. OnNext(250, "0 3"),
  192. OnNext(260, "0 4"),
  193. OnNext(310, "1 5"),
  194. OnNext(340, "1 6")
  195. );
  196. xs.Subscriptions.AssertEqual(
  197. Subscribe(200, 400)
  198. );
  199. }
  200. [TestMethod]
  201. public void Window_Closings_Error()
  202. {
  203. var scheduler = new TestScheduler();
  204. var ex = new Exception();
  205. var xs = scheduler.CreateHotObservable(
  206. OnNext(90, 1),
  207. OnNext(180, 2),
  208. OnNext(250, 3),
  209. OnNext(260, 4),
  210. OnNext(310, 5),
  211. OnNext(340, 6),
  212. OnNext(410, 7),
  213. OnNext(420, 8),
  214. OnNext(470, 9),
  215. OnNext(550, 10),
  216. OnError<int>(590, ex)
  217. );
  218. var window = 1;
  219. var res = scheduler.Start(() =>
  220. xs.Window(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  221. );
  222. res.Messages.AssertEqual(
  223. OnNext(250, "0 3"),
  224. OnNext(260, "0 4"),
  225. OnNext(310, "1 5"),
  226. OnNext(340, "1 6"),
  227. OnNext(410, "1 7"),
  228. OnNext(420, "1 8"),
  229. OnNext(470, "1 9"),
  230. OnNext(550, "2 10"),
  231. OnError<string>(590, ex)
  232. );
  233. xs.Subscriptions.AssertEqual(
  234. Subscribe(200, 590)
  235. );
  236. }
  237. [TestMethod]
  238. public void Window_Closings_Throw()
  239. {
  240. var scheduler = new TestScheduler();
  241. var ex = new Exception();
  242. var xs = scheduler.CreateHotObservable(
  243. OnNext(90, 1),
  244. OnNext(180, 2),
  245. OnNext(250, 3),
  246. OnNext(260, 4),
  247. OnNext(310, 5),
  248. OnNext(340, 6),
  249. OnNext(410, 7),
  250. OnNext(420, 8),
  251. OnNext(470, 9),
  252. OnNext(550, 10),
  253. OnError<int>(590, new Exception())
  254. );
  255. var res = scheduler.Start(() =>
  256. xs.Window<int, int>(() => { throw ex; }).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  257. );
  258. res.Messages.AssertEqual(
  259. OnError<string>(200, ex)
  260. );
  261. xs.Subscriptions.AssertEqual(
  262. Subscribe(200, 200)
  263. );
  264. }
  265. [TestMethod]
  266. public void Window_Closings_WindowClose_Error()
  267. {
  268. var scheduler = new TestScheduler();
  269. var ex = new Exception();
  270. var xs = scheduler.CreateHotObservable(
  271. OnNext(90, 1),
  272. OnNext(180, 2),
  273. OnNext(250, 3),
  274. OnNext(260, 4),
  275. OnNext(310, 5),
  276. OnNext(340, 6),
  277. OnNext(410, 7),
  278. OnNext(420, 8),
  279. OnNext(470, 9),
  280. OnNext(550, 10),
  281. OnError<int>(590, new Exception())
  282. );
  283. var res = scheduler.Start(() =>
  284. xs.Window(() => Observable.Throw<int>(ex, scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  285. );
  286. res.Messages.AssertEqual(
  287. OnError<string>(201, ex)
  288. );
  289. xs.Subscriptions.AssertEqual(
  290. Subscribe(200, 201)
  291. );
  292. }
  293. [TestMethod]
  294. public void Window_Closings_Default()
  295. {
  296. var scheduler = new TestScheduler();
  297. var xs = scheduler.CreateHotObservable(
  298. OnNext(90, 1),
  299. OnNext(180, 2),
  300. OnNext(250, 3),
  301. OnNext(260, 4),
  302. OnNext(310, 5),
  303. OnNext(340, 6),
  304. OnNext(410, 7),
  305. OnNext(420, 8),
  306. OnNext(470, 9),
  307. OnNext(550, 10),
  308. OnCompleted<int>(590)
  309. );
  310. var window = 1;
  311. var res = scheduler.Start(() =>
  312. xs.Window(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  313. );
  314. res.Messages.AssertEqual(
  315. OnNext(250, "0 3"),
  316. OnNext(260, "0 4"),
  317. OnNext(310, "1 5"),
  318. OnNext(340, "1 6"),
  319. OnNext(410, "1 7"),
  320. OnNext(420, "1 8"),
  321. OnNext(470, "1 9"),
  322. OnNext(550, "2 10"),
  323. OnCompleted<string>(590)
  324. );
  325. xs.Subscriptions.AssertEqual(
  326. Subscribe(200, 590)
  327. );
  328. }
  329. [TestMethod]
  330. public void Window_OpeningClosings_Basic()
  331. {
  332. var scheduler = new TestScheduler();
  333. var xs = scheduler.CreateHotObservable(
  334. OnNext(90, 1),
  335. OnNext(180, 2),
  336. OnNext(250, 3),
  337. OnNext(260, 4),
  338. OnNext(310, 5),
  339. OnNext(340, 6),
  340. OnNext(410, 7),
  341. OnNext(420, 8),
  342. OnNext(470, 9),
  343. OnNext(550, 10),
  344. OnCompleted<int>(590)
  345. );
  346. var ys = scheduler.CreateHotObservable(
  347. OnNext(255, 50),
  348. OnNext(330, 100),
  349. OnNext(350, 50),
  350. OnNext(400, 90),
  351. OnCompleted<int>(900)
  352. );
  353. var res = scheduler.Start(() =>
  354. xs.Window(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  355. );
  356. res.Messages.AssertEqual(
  357. OnNext(260, "0 4"),
  358. OnNext(340, "1 6"),
  359. OnNext(410, "1 7"),
  360. OnNext(410, "3 7"),
  361. OnNext(420, "1 8"),
  362. OnNext(420, "3 8"),
  363. OnNext(470, "3 9"),
  364. OnCompleted<string>(900)
  365. );
  366. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  367. xs.Subscriptions.AssertEqual(
  368. Subscribe(200, 590)
  369. );
  370. #else
  371. xs.Subscriptions.AssertEqual(
  372. Subscribe(200, 900)
  373. );
  374. #endif
  375. ys.Subscriptions.AssertEqual(
  376. Subscribe(200, 900)
  377. );
  378. }
  379. [TestMethod]
  380. public void Window_OpeningClosings_Throw()
  381. {
  382. var scheduler = new TestScheduler();
  383. var xs = scheduler.CreateHotObservable(
  384. OnNext(90, 1),
  385. OnNext(180, 2),
  386. OnNext(250, 3),
  387. OnNext(260, 4),
  388. OnNext(310, 5),
  389. OnNext(340, 6),
  390. OnNext(410, 7),
  391. OnNext(420, 8),
  392. OnNext(470, 9),
  393. OnNext(550, 10),
  394. OnCompleted<int>(590)
  395. );
  396. var ys = scheduler.CreateHotObservable(
  397. OnNext(255, 50),
  398. OnNext(330, 100),
  399. OnNext(350, 50),
  400. OnNext(400, 90),
  401. OnCompleted<int>(900)
  402. );
  403. var ex = new Exception();
  404. var res = scheduler.Start(() =>
  405. xs.Window<int, int, int>(ys, x => { throw ex; }).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  406. );
  407. res.Messages.AssertEqual(
  408. OnError<string>(255, ex)
  409. );
  410. xs.Subscriptions.AssertEqual(
  411. Subscribe(200, 255)
  412. );
  413. ys.Subscriptions.AssertEqual(
  414. Subscribe(200, 255)
  415. );
  416. }
  417. [TestMethod]
  418. public void Window_OpeningClosings_Dispose()
  419. {
  420. var scheduler = new TestScheduler();
  421. var xs = scheduler.CreateHotObservable(
  422. OnNext(90, 1),
  423. OnNext(180, 2),
  424. OnNext(250, 3),
  425. OnNext(260, 4),
  426. OnNext(310, 5),
  427. OnNext(340, 6),
  428. OnNext(410, 7),
  429. OnNext(420, 8),
  430. OnNext(470, 9),
  431. OnNext(550, 10),
  432. OnCompleted<int>(590)
  433. );
  434. var ys = scheduler.CreateHotObservable(
  435. OnNext(255, 50),
  436. OnNext(330, 100),
  437. OnNext(350, 50),
  438. OnNext(400, 90),
  439. OnCompleted<int>(900)
  440. );
  441. var res = scheduler.Start(() =>
  442. xs.Window(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  443. 415
  444. );
  445. res.Messages.AssertEqual(
  446. OnNext(260, "0 4"),
  447. OnNext(340, "1 6"),
  448. OnNext(410, "1 7"),
  449. OnNext(410, "3 7")
  450. );
  451. xs.Subscriptions.AssertEqual(
  452. Subscribe(200, 415)
  453. );
  454. ys.Subscriptions.AssertEqual(
  455. Subscribe(200, 415)
  456. );
  457. }
  458. [TestMethod]
  459. public void Window_OpeningClosings_Data_Error()
  460. {
  461. var scheduler = new TestScheduler();
  462. var ex = new Exception();
  463. var xs = scheduler.CreateHotObservable(
  464. OnNext(90, 1),
  465. OnNext(180, 2),
  466. OnNext(250, 3),
  467. OnNext(260, 4),
  468. OnNext(310, 5),
  469. OnNext(340, 6),
  470. OnNext(410, 7),
  471. OnError<int>(415, ex)
  472. );
  473. var ys = scheduler.CreateHotObservable(
  474. OnNext(255, 50),
  475. OnNext(330, 100),
  476. OnNext(350, 50),
  477. OnNext(400, 90),
  478. OnCompleted<int>(900)
  479. );
  480. var res = scheduler.Start(() =>
  481. xs.Window(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  482. );
  483. res.Messages.AssertEqual(
  484. OnNext(260, "0 4"),
  485. OnNext(340, "1 6"),
  486. OnNext(410, "1 7"),
  487. OnNext(410, "3 7"),
  488. OnError<string>(415, ex)
  489. );
  490. xs.Subscriptions.AssertEqual(
  491. Subscribe(200, 415)
  492. );
  493. ys.Subscriptions.AssertEqual(
  494. Subscribe(200, 415)
  495. );
  496. }
  497. [TestMethod]
  498. public void Window_OpeningClosings_Window_Error()
  499. {
  500. var scheduler = new TestScheduler();
  501. var ex = new Exception();
  502. var xs = scheduler.CreateHotObservable(
  503. OnNext(90, 1),
  504. OnNext(180, 2),
  505. OnNext(250, 3),
  506. OnNext(260, 4),
  507. OnNext(310, 5),
  508. OnNext(340, 6),
  509. OnNext(410, 7),
  510. OnNext(420, 8),
  511. OnNext(470, 9),
  512. OnNext(550, 10),
  513. OnCompleted<int>(590)
  514. );
  515. var ys = scheduler.CreateHotObservable(
  516. OnNext(255, 50),
  517. OnNext(330, 100),
  518. OnNext(350, 50),
  519. OnNext(400, 90),
  520. OnError<int>(415, ex)
  521. );
  522. var res = scheduler.Start(() =>
  523. xs.Window(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  524. );
  525. res.Messages.AssertEqual(
  526. OnNext(260, "0 4"),
  527. OnNext(340, "1 6"),
  528. OnNext(410, "1 7"),
  529. OnNext(410, "3 7"),
  530. OnError<string>(415, ex)
  531. );
  532. xs.Subscriptions.AssertEqual(
  533. Subscribe(200, 415)
  534. );
  535. ys.Subscriptions.AssertEqual(
  536. Subscribe(200, 415)
  537. );
  538. }
  539. [TestMethod]
  540. public void Window_Boundaries_Simple()
  541. {
  542. var scheduler = new TestScheduler();
  543. var xs = scheduler.CreateHotObservable(
  544. OnNext(90, 1),
  545. OnNext(180, 2),
  546. OnNext(250, 3),
  547. OnNext(260, 4),
  548. OnNext(310, 5),
  549. OnNext(340, 6),
  550. OnNext(410, 7),
  551. OnNext(420, 8),
  552. OnNext(470, 9),
  553. OnNext(550, 10),
  554. OnCompleted<int>(590)
  555. );
  556. var ys = scheduler.CreateHotObservable(
  557. OnNext(255, true),
  558. OnNext(330, true),
  559. OnNext(350, true),
  560. OnNext(400, true),
  561. OnNext(500, true),
  562. OnCompleted<bool>(900)
  563. );
  564. var res = scheduler.Start(() =>
  565. xs.Window(ys).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  566. );
  567. res.Messages.AssertEqual(
  568. OnNext(250, "0 3"),
  569. OnNext(260, "1 4"),
  570. OnNext(310, "1 5"),
  571. OnNext(340, "2 6"),
  572. OnNext(410, "4 7"),
  573. OnNext(420, "4 8"),
  574. OnNext(470, "4 9"),
  575. OnNext(550, "5 10"),
  576. OnCompleted<string>(590)
  577. );
  578. xs.Subscriptions.AssertEqual(
  579. Subscribe(200, 590)
  580. );
  581. ys.Subscriptions.AssertEqual(
  582. Subscribe(200, 590)
  583. );
  584. }
  585. [TestMethod]
  586. public void Window_Boundaries_OnCompletedBoundaries()
  587. {
  588. var scheduler = new TestScheduler();
  589. var xs = scheduler.CreateHotObservable(
  590. OnNext(90, 1),
  591. OnNext(180, 2),
  592. OnNext(250, 3),
  593. OnNext(260, 4),
  594. OnNext(310, 5),
  595. OnNext(340, 6),
  596. OnNext(410, 7),
  597. OnNext(420, 8),
  598. OnNext(470, 9),
  599. OnNext(550, 10),
  600. OnCompleted<int>(590)
  601. );
  602. var ys = scheduler.CreateHotObservable(
  603. OnNext(255, true),
  604. OnNext(330, true),
  605. OnNext(350, true),
  606. OnCompleted<bool>(400)
  607. );
  608. var res = scheduler.Start(() =>
  609. xs.Window(ys).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  610. );
  611. res.Messages.AssertEqual(
  612. OnNext(250, "0 3"),
  613. OnNext(260, "1 4"),
  614. OnNext(310, "1 5"),
  615. OnNext(340, "2 6"),
  616. OnCompleted<string>(400)
  617. );
  618. xs.Subscriptions.AssertEqual(
  619. Subscribe(200, 400)
  620. );
  621. ys.Subscriptions.AssertEqual(
  622. Subscribe(200, 400)
  623. );
  624. }
  625. [TestMethod]
  626. public void Window_Boundaries_OnErrorSource()
  627. {
  628. var ex = new Exception();
  629. var scheduler = new TestScheduler();
  630. var xs = scheduler.CreateHotObservable(
  631. OnNext(90, 1),
  632. OnNext(180, 2),
  633. OnNext(250, 3),
  634. OnNext(260, 4),
  635. OnNext(310, 5),
  636. OnNext(340, 6),
  637. OnNext(380, 7),
  638. OnError<int>(400, ex)
  639. );
  640. var ys = scheduler.CreateHotObservable(
  641. OnNext(255, true),
  642. OnNext(330, true),
  643. OnNext(350, true),
  644. OnCompleted<bool>(500)
  645. );
  646. var res = scheduler.Start(() =>
  647. xs.Window(ys).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  648. );
  649. res.Messages.AssertEqual(
  650. OnNext(250, "0 3"),
  651. OnNext(260, "1 4"),
  652. OnNext(310, "1 5"),
  653. OnNext(340, "2 6"),
  654. OnNext(380, "3 7"),
  655. OnError<string>(400, ex)
  656. );
  657. xs.Subscriptions.AssertEqual(
  658. Subscribe(200, 400)
  659. );
  660. ys.Subscriptions.AssertEqual(
  661. Subscribe(200, 400)
  662. );
  663. }
  664. [TestMethod]
  665. public void Window_Boundaries_OnErrorBoundaries()
  666. {
  667. var ex = new Exception();
  668. var scheduler = new TestScheduler();
  669. var xs = scheduler.CreateHotObservable(
  670. OnNext(90, 1),
  671. OnNext(180, 2),
  672. OnNext(250, 3),
  673. OnNext(260, 4),
  674. OnNext(310, 5),
  675. OnNext(340, 6),
  676. OnNext(410, 7),
  677. OnNext(420, 8),
  678. OnNext(470, 9),
  679. OnNext(550, 10),
  680. OnCompleted<int>(590)
  681. );
  682. var ys = scheduler.CreateHotObservable(
  683. OnNext(255, true),
  684. OnNext(330, true),
  685. OnNext(350, true),
  686. OnError<bool>(400, ex)
  687. );
  688. var res = scheduler.Start(() =>
  689. xs.Window(ys).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  690. );
  691. res.Messages.AssertEqual(
  692. OnNext(250, "0 3"),
  693. OnNext(260, "1 4"),
  694. OnNext(310, "1 5"),
  695. OnNext(340, "2 6"),
  696. OnError<string>(400, ex)
  697. );
  698. xs.Subscriptions.AssertEqual(
  699. Subscribe(200, 400)
  700. );
  701. ys.Subscriptions.AssertEqual(
  702. Subscribe(200, 400)
  703. );
  704. }
  705. [TestMethod]
  706. public void WindowWithCount_ArgumentChecking()
  707. {
  708. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), 1, 1));
  709. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 0, 1));
  710. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 1, 0));
  711. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), 1));
  712. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 0));
  713. }
  714. [TestMethod]
  715. public void WindowWithCount_Basic()
  716. {
  717. var scheduler = new TestScheduler();
  718. var xs = scheduler.CreateHotObservable(
  719. OnNext(100, 1),
  720. OnNext(210, 2),
  721. OnNext(240, 3),
  722. OnNext(280, 4),
  723. OnNext(320, 5),
  724. OnNext(350, 6),
  725. OnNext(380, 7),
  726. OnNext(420, 8),
  727. OnNext(470, 9),
  728. OnCompleted<int>(600)
  729. );
  730. var res = scheduler.Start(() =>
  731. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  732. );
  733. res.Messages.AssertEqual(
  734. OnNext(210, "0 2"),
  735. OnNext(240, "0 3"),
  736. OnNext(280, "0 4"),
  737. OnNext(280, "1 4"),
  738. OnNext(320, "1 5"),
  739. OnNext(350, "1 6"),
  740. OnNext(350, "2 6"),
  741. OnNext(380, "2 7"),
  742. OnNext(420, "2 8"),
  743. OnNext(420, "3 8"),
  744. OnNext(470, "3 9"),
  745. OnCompleted<string>(600)
  746. );
  747. xs.Subscriptions.AssertEqual(
  748. Subscribe(200, 600)
  749. );
  750. }
  751. [TestMethod]
  752. public void WindowWithCount_InnerTimings()
  753. {
  754. var scheduler = new TestScheduler();
  755. var xs = scheduler.CreateHotObservable(
  756. OnNext(100, 1),
  757. OnNext(210, 2),
  758. OnNext(240, 3),
  759. OnNext(280, 4),
  760. OnNext(320, 5),
  761. OnNext(350, 6),
  762. OnNext(380, 7),
  763. OnNext(420, 8),
  764. OnNext(470, 9),
  765. OnCompleted<int>(600)
  766. );
  767. var res = default(IObservable<IObservable<int>>);
  768. var outerSubscription = default(IDisposable);
  769. var innerSubscriptions = new List<IDisposable>();
  770. var windows = new List<IObservable<int>>();
  771. var observers = new List<ITestableObserver<int>>();
  772. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  773. scheduler.ScheduleAbsolute(Subscribed, () =>
  774. {
  775. outerSubscription = res.Subscribe(
  776. window =>
  777. {
  778. var result = scheduler.CreateObserver<int>();
  779. windows.Add(window);
  780. observers.Add(result);
  781. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  782. }
  783. );
  784. });
  785. scheduler.Start();
  786. Assert.Equal(5, observers.Count);
  787. observers[0].Messages.AssertEqual(
  788. OnNext(210, 2),
  789. OnNext(240, 3),
  790. OnNext(280, 4),
  791. OnCompleted<int>(280)
  792. );
  793. observers[1].Messages.AssertEqual(
  794. OnNext(280, 4),
  795. OnNext(320, 5),
  796. OnNext(350, 6),
  797. OnCompleted<int>(350)
  798. );
  799. observers[2].Messages.AssertEqual(
  800. OnNext(350, 6),
  801. OnNext(380, 7),
  802. OnNext(420, 8),
  803. OnCompleted<int>(420)
  804. );
  805. observers[3].Messages.AssertEqual(
  806. OnNext(420, 8),
  807. OnNext(470, 9),
  808. OnCompleted<int>(600)
  809. );
  810. observers[4].Messages.AssertEqual(
  811. OnCompleted<int>(600)
  812. );
  813. xs.Subscriptions.AssertEqual(
  814. Subscribe(200, 600)
  815. );
  816. }
  817. [TestMethod]
  818. public void WindowWithCount_InnerTimings_DisposeOuter()
  819. {
  820. var scheduler = new TestScheduler();
  821. var xs = scheduler.CreateHotObservable(
  822. OnNext(100, 1),
  823. OnNext(210, 2),
  824. OnNext(240, 3),
  825. OnNext(280, 4),
  826. OnNext(320, 5),
  827. OnNext(350, 6),
  828. OnNext(380, 7),
  829. OnNext(420, 8),
  830. OnNext(470, 9),
  831. OnCompleted<int>(600)
  832. );
  833. var res = default(IObservable<IObservable<int>>);
  834. var outerSubscription = default(IDisposable);
  835. var innerSubscriptions = new List<IDisposable>();
  836. var windows = new List<IObservable<int>>();
  837. var observers = new List<ITestableObserver<int>>();
  838. var windowCreationTimes = new List<long>();
  839. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  840. scheduler.ScheduleAbsolute(Subscribed, () =>
  841. {
  842. outerSubscription = res.Subscribe(
  843. window =>
  844. {
  845. windowCreationTimes.Add(scheduler.Clock);
  846. var result = scheduler.CreateObserver<int>();
  847. windows.Add(window);
  848. observers.Add(result);
  849. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  850. }
  851. );
  852. });
  853. scheduler.ScheduleAbsolute(400, () =>
  854. {
  855. outerSubscription.Dispose();
  856. });
  857. scheduler.Start();
  858. Assert.True(windowCreationTimes.Last() < 400);
  859. Assert.Equal(4, observers.Count);
  860. observers[0].Messages.AssertEqual(
  861. OnNext(210, 2),
  862. OnNext(240, 3),
  863. OnNext(280, 4),
  864. OnCompleted<int>(280)
  865. );
  866. observers[1].Messages.AssertEqual(
  867. OnNext(280, 4),
  868. OnNext(320, 5),
  869. OnNext(350, 6),
  870. OnCompleted<int>(350)
  871. );
  872. observers[2].Messages.AssertEqual(
  873. OnNext(350, 6),
  874. OnNext(380, 7),
  875. OnNext(420, 8),
  876. OnCompleted<int>(420)
  877. );
  878. observers[3].Messages.AssertEqual(
  879. OnNext(420, 8),
  880. OnNext(470, 9),
  881. OnCompleted<int>(600)
  882. );
  883. xs.Subscriptions.AssertEqual(
  884. Subscribe(200, 600)
  885. );
  886. }
  887. [TestMethod]
  888. public void WindowWithCount_InnerTimings_DisposeOuterAndInners()
  889. {
  890. var scheduler = new TestScheduler();
  891. var xs = scheduler.CreateHotObservable(
  892. OnNext(100, 1),
  893. OnNext(210, 2),
  894. OnNext(240, 3),
  895. OnNext(280, 4),
  896. OnNext(320, 5),
  897. OnNext(350, 6),
  898. OnNext(380, 7),
  899. OnNext(420, 8),
  900. OnNext(470, 9),
  901. OnCompleted<int>(600)
  902. );
  903. var res = default(IObservable<IObservable<int>>);
  904. var outerSubscription = default(IDisposable);
  905. var innerSubscriptions = new List<IDisposable>();
  906. var windows = new List<IObservable<int>>();
  907. var observers = new List<ITestableObserver<int>>();
  908. var windowCreationTimes = new List<long>();
  909. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  910. scheduler.ScheduleAbsolute(Subscribed, () =>
  911. {
  912. outerSubscription = res.Subscribe(
  913. window =>
  914. {
  915. windowCreationTimes.Add(scheduler.Clock);
  916. var result = scheduler.CreateObserver<int>();
  917. windows.Add(window);
  918. observers.Add(result);
  919. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  920. }
  921. );
  922. });
  923. scheduler.ScheduleAbsolute(400, () =>
  924. {
  925. outerSubscription.Dispose();
  926. foreach (var d in innerSubscriptions)
  927. {
  928. d.Dispose();
  929. }
  930. });
  931. scheduler.Start();
  932. Assert.True(windowCreationTimes.Last() < 400);
  933. Assert.Equal(4, observers.Count);
  934. observers[0].Messages.AssertEqual(
  935. OnNext(210, 2),
  936. OnNext(240, 3),
  937. OnNext(280, 4),
  938. OnCompleted<int>(280)
  939. );
  940. observers[1].Messages.AssertEqual(
  941. OnNext(280, 4),
  942. OnNext(320, 5),
  943. OnNext(350, 6),
  944. OnCompleted<int>(350)
  945. );
  946. observers[2].Messages.AssertEqual(
  947. OnNext(350, 6),
  948. OnNext(380, 7)
  949. );
  950. observers[3].Messages.AssertEqual(
  951. );
  952. xs.Subscriptions.AssertEqual(
  953. Subscribe(200, 400)
  954. );
  955. }
  956. [TestMethod]
  957. public void WindowWithCount_Disposed()
  958. {
  959. var scheduler = new TestScheduler();
  960. var xs = scheduler.CreateHotObservable(
  961. OnNext(100, 1),
  962. OnNext(210, 2),
  963. OnNext(240, 3),
  964. OnNext(280, 4),
  965. OnNext(320, 5),
  966. OnNext(350, 6),
  967. OnNext(380, 7),
  968. OnNext(420, 8),
  969. OnNext(470, 9),
  970. OnCompleted<int>(600)
  971. );
  972. var res = scheduler.Start(() =>
  973. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(), 370
  974. );
  975. res.Messages.AssertEqual(
  976. OnNext(210, "0 2"),
  977. OnNext(240, "0 3"),
  978. OnNext(280, "0 4"),
  979. OnNext(280, "1 4"),
  980. OnNext(320, "1 5"),
  981. OnNext(350, "1 6"),
  982. OnNext(350, "2 6")
  983. );
  984. xs.Subscriptions.AssertEqual(
  985. Subscribe(200, 370)
  986. );
  987. }
  988. [TestMethod]
  989. public void WindowWithCount_Error()
  990. {
  991. var scheduler = new TestScheduler();
  992. var ex = new Exception();
  993. var xs = scheduler.CreateHotObservable(
  994. OnNext(100, 1),
  995. OnNext(210, 2),
  996. OnNext(240, 3),
  997. OnNext(280, 4),
  998. OnNext(320, 5),
  999. OnNext(350, 6),
  1000. OnNext(380, 7),
  1001. OnNext(420, 8),
  1002. OnNext(470, 9),
  1003. OnError<int>(600, ex)
  1004. );
  1005. var res = scheduler.Start(() =>
  1006. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1007. );
  1008. res.Messages.AssertEqual(
  1009. OnNext(210, "0 2"),
  1010. OnNext(240, "0 3"),
  1011. OnNext(280, "0 4"),
  1012. OnNext(280, "1 4"),
  1013. OnNext(320, "1 5"),
  1014. OnNext(350, "1 6"),
  1015. OnNext(350, "2 6"),
  1016. OnNext(380, "2 7"),
  1017. OnNext(420, "2 8"),
  1018. OnNext(420, "3 8"),
  1019. OnNext(470, "3 9"),
  1020. OnError<string>(600, ex)
  1021. );
  1022. xs.Subscriptions.AssertEqual(
  1023. Subscribe(200, 600)
  1024. );
  1025. }
  1026. [TestMethod]
  1027. public void WindowWithCount_Default()
  1028. {
  1029. Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable());
  1030. Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable());
  1031. Observable.Range(1, 10).Window(3, 2).Skip(1).First().SequenceEqual(new[] { 3, 4, 5 }.ToObservable());
  1032. }
  1033. #endregion
  1034. #region + Timed +
  1035. [TestMethod]
  1036. public void Window_Time_Basic()
  1037. {
  1038. var scheduler = new TestScheduler();
  1039. var xs = scheduler.CreateHotObservable(
  1040. OnNext(150, 1),
  1041. OnNext(210, 2),
  1042. OnNext(240, 3),
  1043. OnNext(270, 4),
  1044. OnNext(320, 5),
  1045. OnNext(360, 6),
  1046. OnNext(390, 7),
  1047. OnNext(410, 8),
  1048. OnNext(460, 9),
  1049. OnNext(470, 10),
  1050. OnCompleted<int>(490)
  1051. );
  1052. var res = scheduler.Start(() =>
  1053. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  1054. );
  1055. res.Messages.AssertEqual(
  1056. OnNext(210, "0 2"),
  1057. OnNext(240, "0 3"),
  1058. OnNext(270, "0 4"),
  1059. OnNext(300, "0 end"),
  1060. OnNext(320, "1 5"),
  1061. OnNext(360, "1 6"),
  1062. OnNext(390, "1 7"),
  1063. OnNext(400, "1 end"),
  1064. OnNext(410, "2 8"),
  1065. OnNext(460, "2 9"),
  1066. OnNext(470, "2 10"),
  1067. OnNext(490, "2 end"),
  1068. OnCompleted<string>(490)
  1069. );
  1070. xs.Subscriptions.AssertEqual(
  1071. Subscribe(200, 490)
  1072. );
  1073. }
  1074. [TestMethod]
  1075. public void Window_Time_Basic_Periodic()
  1076. {
  1077. var scheduler = new PeriodicTestScheduler();
  1078. var xs = scheduler.CreateHotObservable(
  1079. OnNext(150, 1),
  1080. OnNext(210, 2),
  1081. OnNext(240, 3),
  1082. OnNext(270, 4),
  1083. OnNext(320, 5),
  1084. OnNext(360, 6),
  1085. OnNext(390, 7),
  1086. OnNext(410, 8),
  1087. OnNext(460, 9),
  1088. OnNext(470, 10),
  1089. OnCompleted<int>(490)
  1090. );
  1091. var res = scheduler.Start(() =>
  1092. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  1093. );
  1094. res.Messages.AssertEqual(
  1095. OnNext(210, "0 2"),
  1096. OnNext(240, "0 3"),
  1097. OnNext(270, "0 4"),
  1098. OnNext(300, "0 end"),
  1099. OnNext(320, "1 5"),
  1100. OnNext(360, "1 6"),
  1101. OnNext(390, "1 7"),
  1102. OnNext(400, "1 end"),
  1103. OnNext(410, "2 8"),
  1104. OnNext(460, "2 9"),
  1105. OnNext(470, "2 10"),
  1106. OnNext(490, "2 end"),
  1107. OnCompleted<string>(490)
  1108. );
  1109. xs.Subscriptions.AssertEqual(
  1110. Subscribe(200, 490)
  1111. );
  1112. #if !WINDOWS
  1113. scheduler.Timers.AssertEqual(
  1114. new TimerRun(200, 490) { 300, 400 }
  1115. );
  1116. #endif
  1117. }
  1118. [TestMethod]
  1119. public void Window_Time_Basic_Periodic_Error()
  1120. {
  1121. var ex = new Exception();
  1122. var scheduler = new PeriodicTestScheduler();
  1123. var xs = scheduler.CreateHotObservable(
  1124. OnNext(150, 1),
  1125. OnNext(210, 2),
  1126. OnNext(240, 3),
  1127. OnNext(270, 4),
  1128. OnNext(320, 5),
  1129. OnNext(360, 6),
  1130. OnNext(390, 7),
  1131. OnNext(410, 8),
  1132. OnError<int>(460, ex)
  1133. );
  1134. var res = scheduler.Start(() =>
  1135. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  1136. );
  1137. res.Messages.AssertEqual(
  1138. OnNext(210, "0 2"),
  1139. OnNext(240, "0 3"),
  1140. OnNext(270, "0 4"),
  1141. OnNext(300, "0 end"),
  1142. OnNext(320, "1 5"),
  1143. OnNext(360, "1 6"),
  1144. OnNext(390, "1 7"),
  1145. OnNext(400, "1 end"),
  1146. OnNext(410, "2 8"),
  1147. OnError<string>(460, ex)
  1148. );
  1149. xs.Subscriptions.AssertEqual(
  1150. Subscribe(200, 460)
  1151. );
  1152. #if !WINDOWS
  1153. scheduler.Timers.AssertEqual(
  1154. new TimerRun(200, 460) { 300, 400 }
  1155. );
  1156. #endif
  1157. }
  1158. [TestMethod]
  1159. public void Window_Time_Basic_Both()
  1160. {
  1161. var scheduler = new TestScheduler();
  1162. var xs = scheduler.CreateHotObservable(
  1163. OnNext(150, 1),
  1164. OnNext(210, 2),
  1165. OnNext(240, 3),
  1166. OnNext(270, 4),
  1167. OnNext(320, 5),
  1168. OnNext(360, 6),
  1169. OnNext(390, 7),
  1170. OnNext(410, 8),
  1171. OnNext(460, 9),
  1172. OnNext(470, 10),
  1173. OnCompleted<int>(490)
  1174. );
  1175. var res = scheduler.Start(() =>
  1176. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(50), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  1177. );
  1178. res.Messages.AssertEqual(
  1179. OnNext(210, "0 2"),
  1180. OnNext(240, "0 3"),
  1181. OnNext(270, "0 4"),
  1182. OnNext(270, "1 4"),
  1183. OnNext(300, "0 end"),
  1184. OnNext(320, "1 5"),
  1185. OnNext(320, "2 5"),
  1186. OnNext(350, "1 end"),
  1187. OnNext(360, "2 6"),
  1188. OnNext(360, "3 6"),
  1189. OnNext(390, "2 7"),
  1190. OnNext(390, "3 7"),
  1191. OnNext(400, "2 end"),
  1192. OnNext(410, "3 8"),
  1193. OnNext(410, "4 8"),
  1194. OnNext(450, "3 end"),
  1195. OnNext(460, "4 9"),
  1196. OnNext(460, "5 9"),
  1197. OnNext(470, "4 10"),
  1198. OnNext(470, "5 10"),
  1199. OnNext(490, "4 end"),
  1200. OnNext(490, "5 end"),
  1201. OnCompleted<string>(490)
  1202. );
  1203. xs.Subscriptions.AssertEqual(
  1204. Subscribe(200, 490)
  1205. );
  1206. }
  1207. [TestMethod]
  1208. public void WindowWithTime_ArgumentChecking()
  1209. {
  1210. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  1211. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  1212. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  1213. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null));
  1214. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1)));
  1215. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1)));
  1216. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1)));
  1217. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  1218. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  1219. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), null));
  1220. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1)));
  1221. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1)));
  1222. }
  1223. [TestMethod]
  1224. public void WindowWithTime_Basic1()
  1225. {
  1226. var scheduler = new TestScheduler();
  1227. var xs = scheduler.CreateHotObservable(
  1228. OnNext(100, 1),
  1229. OnNext(210, 2),
  1230. OnNext(240, 3),
  1231. OnNext(280, 4),
  1232. OnNext(320, 5),
  1233. OnNext(350, 6),
  1234. OnNext(380, 7),
  1235. OnNext(420, 8),
  1236. OnNext(470, 9),
  1237. OnCompleted<int>(600)
  1238. );
  1239. var res = scheduler.Start(() =>
  1240. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1241. );
  1242. res.Messages.AssertEqual(
  1243. OnNext(210, "0 2"),
  1244. OnNext(240, "0 3"),
  1245. OnNext(280, "0 4"),
  1246. OnNext(280, "1 4"),
  1247. OnNext(320, "1 5"),
  1248. OnNext(350, "1 6"),
  1249. OnNext(350, "2 6"),
  1250. OnNext(380, "2 7"),
  1251. OnNext(420, "2 8"),
  1252. OnNext(420, "3 8"),
  1253. OnNext(470, "3 9"),
  1254. OnCompleted<string>(600)
  1255. );
  1256. xs.Subscriptions.AssertEqual(
  1257. Subscribe(200, 600)
  1258. );
  1259. }
  1260. [TestMethod]
  1261. public void WindowWithTime_Basic2()
  1262. {
  1263. var scheduler = new TestScheduler();
  1264. var xs = scheduler.CreateHotObservable(
  1265. OnNext(100, 1),
  1266. OnNext(210, 2),
  1267. OnNext(240, 3),
  1268. OnNext(280, 4),
  1269. OnNext(320, 5),
  1270. OnNext(350, 6),
  1271. OnNext(380, 7),
  1272. OnNext(420, 8),
  1273. OnNext(470, 9),
  1274. OnCompleted<int>(600)
  1275. );
  1276. var res = scheduler.Start(() =>
  1277. xs.Window(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1278. );
  1279. res.Messages.AssertEqual(
  1280. OnNext(210, "0 2"),
  1281. OnNext(240, "0 3"),
  1282. OnNext(320, "1 5"),
  1283. OnNext(350, "1 6"),
  1284. OnNext(420, "2 8"),
  1285. OnNext(470, "2 9"),
  1286. OnCompleted<string>(600)
  1287. );
  1288. xs.Subscriptions.AssertEqual(
  1289. Subscribe(200, 600)
  1290. );
  1291. }
  1292. [TestMethod]
  1293. public void WindowWithTime_Error()
  1294. {
  1295. var scheduler = new TestScheduler();
  1296. var ex = new Exception();
  1297. var xs = scheduler.CreateHotObservable(
  1298. OnNext(100, 1),
  1299. OnNext(210, 2),
  1300. OnNext(240, 3),
  1301. OnNext(280, 4),
  1302. OnNext(320, 5),
  1303. OnNext(350, 6),
  1304. OnNext(380, 7),
  1305. OnNext(420, 8),
  1306. OnNext(470, 9),
  1307. OnError<int>(600, ex)
  1308. );
  1309. var res = scheduler.Start(() =>
  1310. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1311. );
  1312. res.Messages.AssertEqual(
  1313. OnNext(210, "0 2"),
  1314. OnNext(240, "0 3"),
  1315. OnNext(280, "0 4"),
  1316. OnNext(280, "1 4"),
  1317. OnNext(320, "1 5"),
  1318. OnNext(350, "1 6"),
  1319. OnNext(350, "2 6"),
  1320. OnNext(380, "2 7"),
  1321. OnNext(420, "2 8"),
  1322. OnNext(420, "3 8"),
  1323. OnNext(470, "3 9"),
  1324. OnError<string>(600, ex)
  1325. );
  1326. xs.Subscriptions.AssertEqual(
  1327. Subscribe(200, 600)
  1328. );
  1329. }
  1330. [TestMethod]
  1331. public void WindowWithTime_Disposed()
  1332. {
  1333. var scheduler = new TestScheduler();
  1334. var xs = scheduler.CreateHotObservable(
  1335. OnNext(100, 1),
  1336. OnNext(210, 2),
  1337. OnNext(240, 3),
  1338. OnNext(280, 4),
  1339. OnNext(320, 5),
  1340. OnNext(350, 6),
  1341. OnNext(380, 7),
  1342. OnNext(420, 8),
  1343. OnNext(470, 9),
  1344. OnCompleted<int>(600)
  1345. );
  1346. var res = scheduler.Start(() =>
  1347. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  1348. 370
  1349. );
  1350. res.Messages.AssertEqual(
  1351. OnNext(210, "0 2"),
  1352. OnNext(240, "0 3"),
  1353. OnNext(280, "0 4"),
  1354. OnNext(280, "1 4"),
  1355. OnNext(320, "1 5"),
  1356. OnNext(350, "1 6"),
  1357. OnNext(350, "2 6")
  1358. );
  1359. xs.Subscriptions.AssertEqual(
  1360. Subscribe(200, 370)
  1361. );
  1362. }
  1363. [TestMethod]
  1364. public void WindowWithTime_Basic_Same()
  1365. {
  1366. var scheduler = new TestScheduler();
  1367. var xs = scheduler.CreateHotObservable(
  1368. OnNext(100, 1),
  1369. OnNext(210, 2),
  1370. OnNext(240, 3),
  1371. OnNext(280, 4),
  1372. OnNext(320, 5),
  1373. OnNext(350, 6),
  1374. OnNext(380, 7),
  1375. OnNext(420, 8),
  1376. OnNext(470, 9),
  1377. OnCompleted<int>(600)
  1378. );
  1379. var res = scheduler.Start(() =>
  1380. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1381. );
  1382. res.Messages.AssertEqual(
  1383. OnNext(210, "0 2"),
  1384. OnNext(240, "0 3"),
  1385. OnNext(280, "0 4"),
  1386. OnNext(320, "1 5"),
  1387. OnNext(350, "1 6"),
  1388. OnNext(380, "1 7"),
  1389. OnNext(420, "2 8"),
  1390. OnNext(470, "2 9"),
  1391. OnCompleted<string>(600)
  1392. );
  1393. xs.Subscriptions.AssertEqual(
  1394. Subscribe(200, 600)
  1395. );
  1396. }
  1397. [TestMethod]
  1398. public void WindowWithTime_Default()
  1399. {
  1400. Observable.Range(0, 10).Window(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).SelectMany(Observable.ToList).First().AssertEqual(Enumerable.Range(0, 10));
  1401. Observable.Range(0, 10).Window(TimeSpan.FromDays(1)).SelectMany(Observable.ToList).First().AssertEqual(Enumerable.Range(0, 10));
  1402. }
  1403. [TestMethod]
  1404. public void WindowWithTimeOrCount_ArgumentChecking()
  1405. {
  1406. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance));
  1407. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance));
  1408. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance));
  1409. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 1, null));
  1410. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), 1));
  1411. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1));
  1412. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0));
  1413. }
  1414. [TestMethod]
  1415. public void WindowWithTimeOrCount_Basic()
  1416. {
  1417. var scheduler = new TestScheduler();
  1418. var xs = scheduler.CreateHotObservable(
  1419. OnNext(205, 1),
  1420. OnNext(210, 2),
  1421. OnNext(240, 3),
  1422. OnNext(280, 4),
  1423. OnNext(320, 5),
  1424. OnNext(350, 6),
  1425. OnNext(370, 7),
  1426. OnNext(420, 8),
  1427. OnNext(470, 9),
  1428. OnCompleted<int>(600)
  1429. );
  1430. var res = scheduler.Start(() =>
  1431. xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1432. );
  1433. res.Messages.AssertEqual(
  1434. OnNext(205, "0 1"),
  1435. OnNext(210, "0 2"),
  1436. OnNext(240, "0 3"),
  1437. OnNext(280, "1 4"),
  1438. OnNext(320, "2 5"),
  1439. OnNext(350, "2 6"),
  1440. OnNext(370, "2 7"),
  1441. OnNext(420, "3 8"),
  1442. OnNext(470, "4 9"),
  1443. OnCompleted<string>(600)
  1444. );
  1445. xs.Subscriptions.AssertEqual(
  1446. Subscribe(200, 600)
  1447. );
  1448. }
  1449. [TestMethod]
  1450. public void WindowWithTimeOrCount_Error()
  1451. {
  1452. var scheduler = new TestScheduler();
  1453. var ex = new Exception();
  1454. var xs = scheduler.CreateHotObservable(
  1455. OnNext(205, 1),
  1456. OnNext(210, 2),
  1457. OnNext(240, 3),
  1458. OnNext(280, 4),
  1459. OnNext(320, 5),
  1460. OnNext(350, 6),
  1461. OnNext(370, 7),
  1462. OnNext(420, 8),
  1463. OnNext(470, 9),
  1464. OnError<int>(600, ex)
  1465. );
  1466. var res = scheduler.Start(() =>
  1467. xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1468. );
  1469. res.Messages.AssertEqual(
  1470. OnNext(205, "0 1"),
  1471. OnNext(210, "0 2"),
  1472. OnNext(240, "0 3"),
  1473. OnNext(280, "1 4"),
  1474. OnNext(320, "2 5"),
  1475. OnNext(350, "2 6"),
  1476. OnNext(370, "2 7"),
  1477. OnNext(420, "3 8"),
  1478. OnNext(470, "4 9"),
  1479. OnError<string>(600, ex)
  1480. );
  1481. xs.Subscriptions.AssertEqual(
  1482. Subscribe(200, 600)
  1483. );
  1484. }
  1485. [TestMethod]
  1486. public void WindowWithTimeOrCount_Disposed()
  1487. {
  1488. var scheduler = new TestScheduler();
  1489. var xs = scheduler.CreateHotObservable(
  1490. OnNext(205, 1),
  1491. OnNext(210, 2),
  1492. OnNext(240, 3),
  1493. OnNext(280, 4),
  1494. OnNext(320, 5),
  1495. OnNext(350, 6),
  1496. OnNext(370, 7),
  1497. OnNext(420, 8),
  1498. OnNext(470, 9),
  1499. OnCompleted<int>(600)
  1500. );
  1501. var res = scheduler.Start(() =>
  1502. xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  1503. 370
  1504. );
  1505. res.Messages.AssertEqual(
  1506. OnNext(205, "0 1"),
  1507. OnNext(210, "0 2"),
  1508. OnNext(240, "0 3"),
  1509. OnNext(280, "1 4"),
  1510. OnNext(320, "2 5"),
  1511. OnNext(350, "2 6"),
  1512. OnNext(370, "2 7")
  1513. );
  1514. xs.Subscriptions.AssertEqual(
  1515. Subscribe(200, 370)
  1516. );
  1517. }
  1518. [TestMethod]
  1519. public void WindowWithTimeOrCount_Default()
  1520. {
  1521. Observable.Range(1, 10).Window(TimeSpan.FromDays(1), 3).Skip(1).First().SequenceEqual(Observable.Range(4, 3));
  1522. }
  1523. #endregion
  1524. }
  1525. }