WindowTest.cs 57 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class WindowTest : ReactiveTest
  22. {
  23. #region + Observable +
  24. [Fact]
  25. public void Window_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), DummyFunc<IObservable<int>>.Instance));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, default(Func<IObservable<int>>)));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, default(IObservable<int>), DummyFunc<int, IObservable<int>>.Instance));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>)));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), DummyObservable<int>.Instance));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, default(IObservable<int>)));
  34. }
  35. [Fact]
  36. public void Window_Closings_Basic()
  37. {
  38. var scheduler = new TestScheduler();
  39. var xs = scheduler.CreateHotObservable(
  40. OnNext(90, 1),
  41. OnNext(180, 2),
  42. OnNext(250, 3),
  43. OnNext(260, 4),
  44. OnNext(310, 5),
  45. OnNext(340, 6),
  46. OnNext(410, 7),
  47. OnNext(420, 8),
  48. OnNext(470, 9),
  49. OnNext(550, 10),
  50. OnCompleted<int>(590)
  51. );
  52. var window = 1;
  53. var res = scheduler.Start(() =>
  54. xs.Window(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  55. );
  56. res.Messages.AssertEqual(
  57. OnNext(250, "0 3"),
  58. OnNext(260, "0 4"),
  59. OnNext(310, "1 5"),
  60. OnNext(340, "1 6"),
  61. OnNext(410, "1 7"),
  62. OnNext(420, "1 8"),
  63. OnNext(470, "1 9"),
  64. OnNext(550, "2 10"),
  65. OnCompleted<string>(590)
  66. );
  67. xs.Subscriptions.AssertEqual(
  68. Subscribe(200, 590)
  69. );
  70. }
  71. [Fact]
  72. public void Window_Closings_InnerSubscriptions()
  73. {
  74. var scheduler = new TestScheduler();
  75. var xs = scheduler.CreateHotObservable(
  76. OnNext(90, 1),
  77. OnNext(180, 2),
  78. OnNext(250, 3),
  79. OnNext(260, 4),
  80. OnNext(310, 5),
  81. OnNext(340, 6),
  82. OnNext(410, 7),
  83. OnNext(420, 8),
  84. OnNext(470, 9),
  85. OnNext(550, 10),
  86. OnCompleted<int>(590)
  87. );
  88. var closings = new ITestableObservable<bool>[] {
  89. scheduler.CreateHotObservable(
  90. OnNext(300, true),
  91. OnNext(350, false),
  92. OnCompleted<bool>(380)
  93. ),
  94. scheduler.CreateHotObservable(
  95. OnNext(400, true),
  96. OnNext(510, false),
  97. OnNext(620, false)
  98. ),
  99. scheduler.CreateHotObservable(
  100. OnCompleted<bool>(500)
  101. ),
  102. scheduler.CreateHotObservable(
  103. OnNext(600, true)
  104. )
  105. };
  106. var window = 0;
  107. var res = scheduler.Start(() =>
  108. xs.Window(() => closings[window++]).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  109. );
  110. res.Messages.AssertEqual(
  111. OnNext(250, "0 3"),
  112. OnNext(260, "0 4"),
  113. OnNext(310, "1 5"),
  114. OnNext(340, "1 6"),
  115. OnNext(410, "2 7"),
  116. OnNext(420, "2 8"),
  117. OnNext(470, "2 9"),
  118. OnNext(550, "3 10"),
  119. OnCompleted<string>(590)
  120. );
  121. xs.Subscriptions.AssertEqual(
  122. Subscribe(200, 590)
  123. );
  124. closings[0].Subscriptions.AssertEqual(
  125. Subscribe(200, 300)
  126. );
  127. closings[1].Subscriptions.AssertEqual(
  128. Subscribe(300, 400)
  129. );
  130. closings[2].Subscriptions.AssertEqual(
  131. Subscribe(400, 500)
  132. );
  133. closings[3].Subscriptions.AssertEqual(
  134. Subscribe(500, 590)
  135. );
  136. }
  137. [Fact]
  138. public void Window_Closings_Empty()
  139. {
  140. var scheduler = new TestScheduler();
  141. var xs = scheduler.CreateHotObservable(
  142. OnNext(90, 1),
  143. OnNext(180, 2),
  144. OnNext(250, 3),
  145. OnNext(260, 4),
  146. OnNext(310, 5),
  147. OnNext(340, 6),
  148. OnNext(410, 7),
  149. OnNext(420, 8),
  150. OnNext(470, 9),
  151. OnNext(550, 10),
  152. OnCompleted<int>(590)
  153. );
  154. var window = 1;
  155. var res = scheduler.Start(() =>
  156. xs.Window(() => Observable.Empty<int>().Delay(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  157. );
  158. res.Messages.AssertEqual(
  159. OnNext(250, "0 3"),
  160. OnNext(260, "0 4"),
  161. OnNext(310, "1 5"),
  162. OnNext(340, "1 6"),
  163. OnNext(410, "1 7"),
  164. OnNext(420, "1 8"),
  165. OnNext(470, "1 9"),
  166. OnNext(550, "2 10"),
  167. OnCompleted<string>(590)
  168. );
  169. xs.Subscriptions.AssertEqual(
  170. Subscribe(200, 590)
  171. );
  172. }
  173. [Fact]
  174. public void Window_Closings_Dispose()
  175. {
  176. var scheduler = new TestScheduler();
  177. var xs = scheduler.CreateHotObservable(
  178. OnNext(90, 1),
  179. OnNext(180, 2),
  180. OnNext(250, 3),
  181. OnNext(260, 4),
  182. OnNext(310, 5),
  183. OnNext(340, 6),
  184. OnNext(410, 7),
  185. OnNext(420, 8),
  186. OnNext(470, 9),
  187. OnNext(550, 10),
  188. OnCompleted<int>(590)
  189. );
  190. var window = 1;
  191. var res = scheduler.Start(() =>
  192. xs.Window(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  193. 400
  194. );
  195. res.Messages.AssertEqual(
  196. OnNext(250, "0 3"),
  197. OnNext(260, "0 4"),
  198. OnNext(310, "1 5"),
  199. OnNext(340, "1 6")
  200. );
  201. xs.Subscriptions.AssertEqual(
  202. Subscribe(200, 400)
  203. );
  204. }
  205. [Fact]
  206. public void Window_Closings_Error()
  207. {
  208. var scheduler = new TestScheduler();
  209. var ex = new Exception();
  210. var xs = scheduler.CreateHotObservable(
  211. OnNext(90, 1),
  212. OnNext(180, 2),
  213. OnNext(250, 3),
  214. OnNext(260, 4),
  215. OnNext(310, 5),
  216. OnNext(340, 6),
  217. OnNext(410, 7),
  218. OnNext(420, 8),
  219. OnNext(470, 9),
  220. OnNext(550, 10),
  221. OnError<int>(590, ex)
  222. );
  223. var window = 1;
  224. var res = scheduler.Start(() =>
  225. xs.Window(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  226. );
  227. res.Messages.AssertEqual(
  228. OnNext(250, "0 3"),
  229. OnNext(260, "0 4"),
  230. OnNext(310, "1 5"),
  231. OnNext(340, "1 6"),
  232. OnNext(410, "1 7"),
  233. OnNext(420, "1 8"),
  234. OnNext(470, "1 9"),
  235. OnNext(550, "2 10"),
  236. OnError<string>(590, ex)
  237. );
  238. xs.Subscriptions.AssertEqual(
  239. Subscribe(200, 590)
  240. );
  241. }
  242. [Fact]
  243. public void Window_Closings_Throw()
  244. {
  245. var scheduler = new TestScheduler();
  246. var ex = new Exception();
  247. var xs = scheduler.CreateHotObservable(
  248. OnNext(90, 1),
  249. OnNext(180, 2),
  250. OnNext(250, 3),
  251. OnNext(260, 4),
  252. OnNext(310, 5),
  253. OnNext(340, 6),
  254. OnNext(410, 7),
  255. OnNext(420, 8),
  256. OnNext(470, 9),
  257. OnNext(550, 10),
  258. OnError<int>(590, new Exception())
  259. );
  260. var res = scheduler.Start(() =>
  261. xs.Window<int, int>(() => { throw ex; }).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  262. );
  263. res.Messages.AssertEqual(
  264. OnError<string>(200, ex)
  265. );
  266. xs.Subscriptions.AssertEqual(
  267. Subscribe(200, 200)
  268. );
  269. }
  270. [Fact]
  271. public void Window_Closings_WindowClose_Error()
  272. {
  273. var scheduler = new TestScheduler();
  274. var ex = new Exception();
  275. var xs = scheduler.CreateHotObservable(
  276. OnNext(90, 1),
  277. OnNext(180, 2),
  278. OnNext(250, 3),
  279. OnNext(260, 4),
  280. OnNext(310, 5),
  281. OnNext(340, 6),
  282. OnNext(410, 7),
  283. OnNext(420, 8),
  284. OnNext(470, 9),
  285. OnNext(550, 10),
  286. OnError<int>(590, new Exception())
  287. );
  288. var res = scheduler.Start(() =>
  289. xs.Window(() => Observable.Throw<int>(ex, scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  290. );
  291. res.Messages.AssertEqual(
  292. OnError<string>(201, ex)
  293. );
  294. xs.Subscriptions.AssertEqual(
  295. Subscribe(200, 201)
  296. );
  297. }
  298. [Fact]
  299. public void Window_Closings_Default()
  300. {
  301. var scheduler = new TestScheduler();
  302. var xs = scheduler.CreateHotObservable(
  303. OnNext(90, 1),
  304. OnNext(180, 2),
  305. OnNext(250, 3),
  306. OnNext(260, 4),
  307. OnNext(310, 5),
  308. OnNext(340, 6),
  309. OnNext(410, 7),
  310. OnNext(420, 8),
  311. OnNext(470, 9),
  312. OnNext(550, 10),
  313. OnCompleted<int>(590)
  314. );
  315. var window = 1;
  316. var res = scheduler.Start(() =>
  317. xs.Window(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  318. );
  319. res.Messages.AssertEqual(
  320. OnNext(250, "0 3"),
  321. OnNext(260, "0 4"),
  322. OnNext(310, "1 5"),
  323. OnNext(340, "1 6"),
  324. OnNext(410, "1 7"),
  325. OnNext(420, "1 8"),
  326. OnNext(470, "1 9"),
  327. OnNext(550, "2 10"),
  328. OnCompleted<string>(590)
  329. );
  330. xs.Subscriptions.AssertEqual(
  331. Subscribe(200, 590)
  332. );
  333. }
  334. [Fact]
  335. public void Window_OpeningClosings_Basic()
  336. {
  337. var scheduler = new TestScheduler();
  338. var xs = scheduler.CreateHotObservable(
  339. OnNext(90, 1),
  340. OnNext(180, 2),
  341. OnNext(250, 3),
  342. OnNext(260, 4),
  343. OnNext(310, 5),
  344. OnNext(340, 6),
  345. OnNext(410, 7),
  346. OnNext(420, 8),
  347. OnNext(470, 9),
  348. OnNext(550, 10),
  349. OnCompleted<int>(590)
  350. );
  351. var ys = scheduler.CreateHotObservable(
  352. OnNext(255, 50),
  353. OnNext(330, 100),
  354. OnNext(350, 50),
  355. OnNext(400, 90),
  356. OnCompleted<int>(900)
  357. );
  358. var res = scheduler.Start(() =>
  359. xs.Window(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  360. );
  361. res.Messages.AssertEqual(
  362. OnNext(260, "0 4"),
  363. OnNext(340, "1 6"),
  364. OnNext(410, "1 7"),
  365. OnNext(410, "3 7"),
  366. OnNext(420, "1 8"),
  367. OnNext(420, "3 8"),
  368. OnNext(470, "3 9"),
  369. OnCompleted<string>(900)
  370. );
  371. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  372. xs.Subscriptions.AssertEqual(
  373. Subscribe(200, 590)
  374. );
  375. #else
  376. xs.Subscriptions.AssertEqual(
  377. Subscribe(200, 900)
  378. );
  379. #endif
  380. ys.Subscriptions.AssertEqual(
  381. Subscribe(200, 900)
  382. );
  383. }
  384. [Fact]
  385. public void Window_OpeningClosings_Throw()
  386. {
  387. var scheduler = new TestScheduler();
  388. var xs = scheduler.CreateHotObservable(
  389. OnNext(90, 1),
  390. OnNext(180, 2),
  391. OnNext(250, 3),
  392. OnNext(260, 4),
  393. OnNext(310, 5),
  394. OnNext(340, 6),
  395. OnNext(410, 7),
  396. OnNext(420, 8),
  397. OnNext(470, 9),
  398. OnNext(550, 10),
  399. OnCompleted<int>(590)
  400. );
  401. var ys = scheduler.CreateHotObservable(
  402. OnNext(255, 50),
  403. OnNext(330, 100),
  404. OnNext(350, 50),
  405. OnNext(400, 90),
  406. OnCompleted<int>(900)
  407. );
  408. var ex = new Exception();
  409. var res = scheduler.Start(() =>
  410. xs.Window<int, int, int>(ys, x => { throw ex; }).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  411. );
  412. res.Messages.AssertEqual(
  413. OnError<string>(255, ex)
  414. );
  415. xs.Subscriptions.AssertEqual(
  416. Subscribe(200, 255)
  417. );
  418. ys.Subscriptions.AssertEqual(
  419. Subscribe(200, 255)
  420. );
  421. }
  422. [Fact]
  423. public void Window_OpeningClosings_Dispose()
  424. {
  425. var scheduler = new TestScheduler();
  426. var xs = scheduler.CreateHotObservable(
  427. OnNext(90, 1),
  428. OnNext(180, 2),
  429. OnNext(250, 3),
  430. OnNext(260, 4),
  431. OnNext(310, 5),
  432. OnNext(340, 6),
  433. OnNext(410, 7),
  434. OnNext(420, 8),
  435. OnNext(470, 9),
  436. OnNext(550, 10),
  437. OnCompleted<int>(590)
  438. );
  439. var ys = scheduler.CreateHotObservable(
  440. OnNext(255, 50),
  441. OnNext(330, 100),
  442. OnNext(350, 50),
  443. OnNext(400, 90),
  444. OnCompleted<int>(900)
  445. );
  446. var res = scheduler.Start(() =>
  447. xs.Window(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  448. 415
  449. );
  450. res.Messages.AssertEqual(
  451. OnNext(260, "0 4"),
  452. OnNext(340, "1 6"),
  453. OnNext(410, "1 7"),
  454. OnNext(410, "3 7")
  455. );
  456. xs.Subscriptions.AssertEqual(
  457. Subscribe(200, 415)
  458. );
  459. ys.Subscriptions.AssertEqual(
  460. Subscribe(200, 415)
  461. );
  462. }
  463. [Fact]
  464. public void Window_OpeningClosings_Data_Error()
  465. {
  466. var scheduler = new TestScheduler();
  467. var ex = new Exception();
  468. var xs = scheduler.CreateHotObservable(
  469. OnNext(90, 1),
  470. OnNext(180, 2),
  471. OnNext(250, 3),
  472. OnNext(260, 4),
  473. OnNext(310, 5),
  474. OnNext(340, 6),
  475. OnNext(410, 7),
  476. OnError<int>(415, ex)
  477. );
  478. var ys = scheduler.CreateHotObservable(
  479. OnNext(255, 50),
  480. OnNext(330, 100),
  481. OnNext(350, 50),
  482. OnNext(400, 90),
  483. OnCompleted<int>(900)
  484. );
  485. var res = scheduler.Start(() =>
  486. xs.Window(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  487. );
  488. res.Messages.AssertEqual(
  489. OnNext(260, "0 4"),
  490. OnNext(340, "1 6"),
  491. OnNext(410, "1 7"),
  492. OnNext(410, "3 7"),
  493. OnError<string>(415, ex)
  494. );
  495. xs.Subscriptions.AssertEqual(
  496. Subscribe(200, 415)
  497. );
  498. ys.Subscriptions.AssertEqual(
  499. Subscribe(200, 415)
  500. );
  501. }
  502. [Fact]
  503. public void Window_OpeningClosings_Window_Error()
  504. {
  505. var scheduler = new TestScheduler();
  506. var ex = new Exception();
  507. var xs = scheduler.CreateHotObservable(
  508. OnNext(90, 1),
  509. OnNext(180, 2),
  510. OnNext(250, 3),
  511. OnNext(260, 4),
  512. OnNext(310, 5),
  513. OnNext(340, 6),
  514. OnNext(410, 7),
  515. OnNext(420, 8),
  516. OnNext(470, 9),
  517. OnNext(550, 10),
  518. OnCompleted<int>(590)
  519. );
  520. var ys = scheduler.CreateHotObservable(
  521. OnNext(255, 50),
  522. OnNext(330, 100),
  523. OnNext(350, 50),
  524. OnNext(400, 90),
  525. OnError<int>(415, ex)
  526. );
  527. var res = scheduler.Start(() =>
  528. xs.Window(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler)).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  529. );
  530. res.Messages.AssertEqual(
  531. OnNext(260, "0 4"),
  532. OnNext(340, "1 6"),
  533. OnNext(410, "1 7"),
  534. OnNext(410, "3 7"),
  535. OnError<string>(415, ex)
  536. );
  537. xs.Subscriptions.AssertEqual(
  538. Subscribe(200, 415)
  539. );
  540. ys.Subscriptions.AssertEqual(
  541. Subscribe(200, 415)
  542. );
  543. }
  544. [Fact]
  545. public void Window_Boundaries_Simple()
  546. {
  547. var scheduler = new TestScheduler();
  548. var xs = scheduler.CreateHotObservable(
  549. OnNext(90, 1),
  550. OnNext(180, 2),
  551. OnNext(250, 3),
  552. OnNext(260, 4),
  553. OnNext(310, 5),
  554. OnNext(340, 6),
  555. OnNext(410, 7),
  556. OnNext(420, 8),
  557. OnNext(470, 9),
  558. OnNext(550, 10),
  559. OnCompleted<int>(590)
  560. );
  561. var ys = scheduler.CreateHotObservable(
  562. OnNext(255, true),
  563. OnNext(330, true),
  564. OnNext(350, true),
  565. OnNext(400, true),
  566. OnNext(500, true),
  567. OnCompleted<bool>(900)
  568. );
  569. var res = scheduler.Start(() =>
  570. xs.Window(ys).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  571. );
  572. res.Messages.AssertEqual(
  573. OnNext(250, "0 3"),
  574. OnNext(260, "1 4"),
  575. OnNext(310, "1 5"),
  576. OnNext(340, "2 6"),
  577. OnNext(410, "4 7"),
  578. OnNext(420, "4 8"),
  579. OnNext(470, "4 9"),
  580. OnNext(550, "5 10"),
  581. OnCompleted<string>(590)
  582. );
  583. xs.Subscriptions.AssertEqual(
  584. Subscribe(200, 590)
  585. );
  586. ys.Subscriptions.AssertEqual(
  587. Subscribe(200, 590)
  588. );
  589. }
  590. [Fact]
  591. public void Window_Boundaries_OnCompletedBoundaries()
  592. {
  593. var scheduler = new TestScheduler();
  594. var xs = scheduler.CreateHotObservable(
  595. OnNext(90, 1),
  596. OnNext(180, 2),
  597. OnNext(250, 3),
  598. OnNext(260, 4),
  599. OnNext(310, 5),
  600. OnNext(340, 6),
  601. OnNext(410, 7),
  602. OnNext(420, 8),
  603. OnNext(470, 9),
  604. OnNext(550, 10),
  605. OnCompleted<int>(590)
  606. );
  607. var ys = scheduler.CreateHotObservable(
  608. OnNext(255, true),
  609. OnNext(330, true),
  610. OnNext(350, true),
  611. OnCompleted<bool>(400)
  612. );
  613. var res = scheduler.Start(() =>
  614. xs.Window(ys).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  615. );
  616. res.Messages.AssertEqual(
  617. OnNext(250, "0 3"),
  618. OnNext(260, "1 4"),
  619. OnNext(310, "1 5"),
  620. OnNext(340, "2 6"),
  621. OnCompleted<string>(400)
  622. );
  623. xs.Subscriptions.AssertEqual(
  624. Subscribe(200, 400)
  625. );
  626. ys.Subscriptions.AssertEqual(
  627. Subscribe(200, 400)
  628. );
  629. }
  630. [Fact]
  631. public void Window_Boundaries_OnErrorSource()
  632. {
  633. var ex = new Exception();
  634. var scheduler = new TestScheduler();
  635. var xs = scheduler.CreateHotObservable(
  636. OnNext(90, 1),
  637. OnNext(180, 2),
  638. OnNext(250, 3),
  639. OnNext(260, 4),
  640. OnNext(310, 5),
  641. OnNext(340, 6),
  642. OnNext(380, 7),
  643. OnError<int>(400, ex)
  644. );
  645. var ys = scheduler.CreateHotObservable(
  646. OnNext(255, true),
  647. OnNext(330, true),
  648. OnNext(350, true),
  649. OnCompleted<bool>(500)
  650. );
  651. var res = scheduler.Start(() =>
  652. xs.Window(ys).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  653. );
  654. res.Messages.AssertEqual(
  655. OnNext(250, "0 3"),
  656. OnNext(260, "1 4"),
  657. OnNext(310, "1 5"),
  658. OnNext(340, "2 6"),
  659. OnNext(380, "3 7"),
  660. OnError<string>(400, ex)
  661. );
  662. xs.Subscriptions.AssertEqual(
  663. Subscribe(200, 400)
  664. );
  665. ys.Subscriptions.AssertEqual(
  666. Subscribe(200, 400)
  667. );
  668. }
  669. [Fact]
  670. public void Window_Boundaries_OnErrorBoundaries()
  671. {
  672. var ex = new Exception();
  673. var scheduler = new TestScheduler();
  674. var xs = scheduler.CreateHotObservable(
  675. OnNext(90, 1),
  676. OnNext(180, 2),
  677. OnNext(250, 3),
  678. OnNext(260, 4),
  679. OnNext(310, 5),
  680. OnNext(340, 6),
  681. OnNext(410, 7),
  682. OnNext(420, 8),
  683. OnNext(470, 9),
  684. OnNext(550, 10),
  685. OnCompleted<int>(590)
  686. );
  687. var ys = scheduler.CreateHotObservable(
  688. OnNext(255, true),
  689. OnNext(330, true),
  690. OnNext(350, true),
  691. OnError<bool>(400, ex)
  692. );
  693. var res = scheduler.Start(() =>
  694. xs.Window(ys).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  695. );
  696. res.Messages.AssertEqual(
  697. OnNext(250, "0 3"),
  698. OnNext(260, "1 4"),
  699. OnNext(310, "1 5"),
  700. OnNext(340, "2 6"),
  701. OnError<string>(400, ex)
  702. );
  703. xs.Subscriptions.AssertEqual(
  704. Subscribe(200, 400)
  705. );
  706. ys.Subscriptions.AssertEqual(
  707. Subscribe(200, 400)
  708. );
  709. }
  710. [Fact]
  711. public void WindowWithCount_ArgumentChecking()
  712. {
  713. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), 1, 1));
  714. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 0, 1));
  715. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 1, 0));
  716. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), 1));
  717. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 0));
  718. }
  719. [Fact]
  720. public void WindowWithCount_Basic()
  721. {
  722. var scheduler = new TestScheduler();
  723. var xs = scheduler.CreateHotObservable(
  724. OnNext(100, 1),
  725. OnNext(210, 2),
  726. OnNext(240, 3),
  727. OnNext(280, 4),
  728. OnNext(320, 5),
  729. OnNext(350, 6),
  730. OnNext(380, 7),
  731. OnNext(420, 8),
  732. OnNext(470, 9),
  733. OnCompleted<int>(600)
  734. );
  735. var res = scheduler.Start(() =>
  736. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  737. );
  738. res.Messages.AssertEqual(
  739. OnNext(210, "0 2"),
  740. OnNext(240, "0 3"),
  741. OnNext(280, "0 4"),
  742. OnNext(280, "1 4"),
  743. OnNext(320, "1 5"),
  744. OnNext(350, "1 6"),
  745. OnNext(350, "2 6"),
  746. OnNext(380, "2 7"),
  747. OnNext(420, "2 8"),
  748. OnNext(420, "3 8"),
  749. OnNext(470, "3 9"),
  750. OnCompleted<string>(600)
  751. );
  752. xs.Subscriptions.AssertEqual(
  753. Subscribe(200, 600)
  754. );
  755. }
  756. [Fact]
  757. public void WindowWithCount_InnerTimings()
  758. {
  759. var scheduler = new TestScheduler();
  760. var xs = scheduler.CreateHotObservable(
  761. OnNext(100, 1),
  762. OnNext(210, 2),
  763. OnNext(240, 3),
  764. OnNext(280, 4),
  765. OnNext(320, 5),
  766. OnNext(350, 6),
  767. OnNext(380, 7),
  768. OnNext(420, 8),
  769. OnNext(470, 9),
  770. OnCompleted<int>(600)
  771. );
  772. var res = default(IObservable<IObservable<int>>);
  773. var outerSubscription = default(IDisposable);
  774. var innerSubscriptions = new List<IDisposable>();
  775. var windows = new List<IObservable<int>>();
  776. var observers = new List<ITestableObserver<int>>();
  777. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  778. scheduler.ScheduleAbsolute(Subscribed, () =>
  779. {
  780. outerSubscription = res.Subscribe(
  781. window =>
  782. {
  783. var result = scheduler.CreateObserver<int>();
  784. windows.Add(window);
  785. observers.Add(result);
  786. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  787. }
  788. );
  789. });
  790. scheduler.Start();
  791. Assert.Equal(5, observers.Count);
  792. observers[0].Messages.AssertEqual(
  793. OnNext(210, 2),
  794. OnNext(240, 3),
  795. OnNext(280, 4),
  796. OnCompleted<int>(280)
  797. );
  798. observers[1].Messages.AssertEqual(
  799. OnNext(280, 4),
  800. OnNext(320, 5),
  801. OnNext(350, 6),
  802. OnCompleted<int>(350)
  803. );
  804. observers[2].Messages.AssertEqual(
  805. OnNext(350, 6),
  806. OnNext(380, 7),
  807. OnNext(420, 8),
  808. OnCompleted<int>(420)
  809. );
  810. observers[3].Messages.AssertEqual(
  811. OnNext(420, 8),
  812. OnNext(470, 9),
  813. OnCompleted<int>(600)
  814. );
  815. observers[4].Messages.AssertEqual(
  816. OnCompleted<int>(600)
  817. );
  818. xs.Subscriptions.AssertEqual(
  819. Subscribe(200, 600)
  820. );
  821. }
  822. [Fact]
  823. public void WindowWithCount_InnerTimings_DisposeOuter()
  824. {
  825. var scheduler = new TestScheduler();
  826. var xs = scheduler.CreateHotObservable(
  827. OnNext(100, 1),
  828. OnNext(210, 2),
  829. OnNext(240, 3),
  830. OnNext(280, 4),
  831. OnNext(320, 5),
  832. OnNext(350, 6),
  833. OnNext(380, 7),
  834. OnNext(420, 8),
  835. OnNext(470, 9),
  836. OnCompleted<int>(600)
  837. );
  838. var res = default(IObservable<IObservable<int>>);
  839. var outerSubscription = default(IDisposable);
  840. var innerSubscriptions = new List<IDisposable>();
  841. var windows = new List<IObservable<int>>();
  842. var observers = new List<ITestableObserver<int>>();
  843. var windowCreationTimes = new List<long>();
  844. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  845. scheduler.ScheduleAbsolute(Subscribed, () =>
  846. {
  847. outerSubscription = res.Subscribe(
  848. window =>
  849. {
  850. windowCreationTimes.Add(scheduler.Clock);
  851. var result = scheduler.CreateObserver<int>();
  852. windows.Add(window);
  853. observers.Add(result);
  854. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  855. }
  856. );
  857. });
  858. scheduler.ScheduleAbsolute(400, () =>
  859. {
  860. outerSubscription.Dispose();
  861. });
  862. scheduler.Start();
  863. Assert.True(windowCreationTimes.Last() < 400);
  864. Assert.Equal(4, observers.Count);
  865. observers[0].Messages.AssertEqual(
  866. OnNext(210, 2),
  867. OnNext(240, 3),
  868. OnNext(280, 4),
  869. OnCompleted<int>(280)
  870. );
  871. observers[1].Messages.AssertEqual(
  872. OnNext(280, 4),
  873. OnNext(320, 5),
  874. OnNext(350, 6),
  875. OnCompleted<int>(350)
  876. );
  877. observers[2].Messages.AssertEqual(
  878. OnNext(350, 6),
  879. OnNext(380, 7),
  880. OnNext(420, 8),
  881. OnCompleted<int>(420)
  882. );
  883. observers[3].Messages.AssertEqual(
  884. OnNext(420, 8),
  885. OnNext(470, 9),
  886. OnCompleted<int>(600)
  887. );
  888. xs.Subscriptions.AssertEqual(
  889. Subscribe(200, 600)
  890. );
  891. }
  892. [Fact]
  893. public void WindowWithCount_InnerTimings_DisposeOuterAndInners()
  894. {
  895. var scheduler = new TestScheduler();
  896. var xs = scheduler.CreateHotObservable(
  897. OnNext(100, 1),
  898. OnNext(210, 2),
  899. OnNext(240, 3),
  900. OnNext(280, 4),
  901. OnNext(320, 5),
  902. OnNext(350, 6),
  903. OnNext(380, 7),
  904. OnNext(420, 8),
  905. OnNext(470, 9),
  906. OnCompleted<int>(600)
  907. );
  908. var res = default(IObservable<IObservable<int>>);
  909. var outerSubscription = default(IDisposable);
  910. var innerSubscriptions = new List<IDisposable>();
  911. var windows = new List<IObservable<int>>();
  912. var observers = new List<ITestableObserver<int>>();
  913. var windowCreationTimes = new List<long>();
  914. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  915. scheduler.ScheduleAbsolute(Subscribed, () =>
  916. {
  917. outerSubscription = res.Subscribe(
  918. window =>
  919. {
  920. windowCreationTimes.Add(scheduler.Clock);
  921. var result = scheduler.CreateObserver<int>();
  922. windows.Add(window);
  923. observers.Add(result);
  924. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  925. }
  926. );
  927. });
  928. scheduler.ScheduleAbsolute(400, () =>
  929. {
  930. outerSubscription.Dispose();
  931. foreach (var d in innerSubscriptions)
  932. d.Dispose();
  933. });
  934. scheduler.Start();
  935. Assert.True(windowCreationTimes.Last() < 400);
  936. Assert.Equal(4, observers.Count);
  937. observers[0].Messages.AssertEqual(
  938. OnNext(210, 2),
  939. OnNext(240, 3),
  940. OnNext(280, 4),
  941. OnCompleted<int>(280)
  942. );
  943. observers[1].Messages.AssertEqual(
  944. OnNext(280, 4),
  945. OnNext(320, 5),
  946. OnNext(350, 6),
  947. OnCompleted<int>(350)
  948. );
  949. observers[2].Messages.AssertEqual(
  950. OnNext(350, 6),
  951. OnNext(380, 7)
  952. );
  953. observers[3].Messages.AssertEqual(
  954. );
  955. xs.Subscriptions.AssertEqual(
  956. Subscribe(200, 400)
  957. );
  958. }
  959. [Fact]
  960. public void WindowWithCount_Disposed()
  961. {
  962. var scheduler = new TestScheduler();
  963. var xs = scheduler.CreateHotObservable(
  964. OnNext(100, 1),
  965. OnNext(210, 2),
  966. OnNext(240, 3),
  967. OnNext(280, 4),
  968. OnNext(320, 5),
  969. OnNext(350, 6),
  970. OnNext(380, 7),
  971. OnNext(420, 8),
  972. OnNext(470, 9),
  973. OnCompleted<int>(600)
  974. );
  975. var res = scheduler.Start(() =>
  976. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(), 370
  977. );
  978. res.Messages.AssertEqual(
  979. OnNext(210, "0 2"),
  980. OnNext(240, "0 3"),
  981. OnNext(280, "0 4"),
  982. OnNext(280, "1 4"),
  983. OnNext(320, "1 5"),
  984. OnNext(350, "1 6"),
  985. OnNext(350, "2 6")
  986. );
  987. xs.Subscriptions.AssertEqual(
  988. Subscribe(200, 370)
  989. );
  990. }
  991. [Fact]
  992. public void WindowWithCount_Error()
  993. {
  994. var scheduler = new TestScheduler();
  995. var ex = new Exception();
  996. var xs = scheduler.CreateHotObservable(
  997. OnNext(100, 1),
  998. OnNext(210, 2),
  999. OnNext(240, 3),
  1000. OnNext(280, 4),
  1001. OnNext(320, 5),
  1002. OnNext(350, 6),
  1003. OnNext(380, 7),
  1004. OnNext(420, 8),
  1005. OnNext(470, 9),
  1006. OnError<int>(600, ex)
  1007. );
  1008. var res = scheduler.Start(() =>
  1009. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1010. );
  1011. res.Messages.AssertEqual(
  1012. OnNext(210, "0 2"),
  1013. OnNext(240, "0 3"),
  1014. OnNext(280, "0 4"),
  1015. OnNext(280, "1 4"),
  1016. OnNext(320, "1 5"),
  1017. OnNext(350, "1 6"),
  1018. OnNext(350, "2 6"),
  1019. OnNext(380, "2 7"),
  1020. OnNext(420, "2 8"),
  1021. OnNext(420, "3 8"),
  1022. OnNext(470, "3 9"),
  1023. OnError<string>(600, ex)
  1024. );
  1025. xs.Subscriptions.AssertEqual(
  1026. Subscribe(200, 600)
  1027. );
  1028. }
  1029. [Fact]
  1030. public void WindowWithCount_Default()
  1031. {
  1032. Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable());
  1033. Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable());
  1034. Observable.Range(1, 10).Window(3, 2).Skip(1).First().SequenceEqual(new[] { 3, 4, 5 }.ToObservable());
  1035. }
  1036. #endregion
  1037. #region + Timed +
  1038. [Fact]
  1039. public void Window_Time_Basic()
  1040. {
  1041. var scheduler = new TestScheduler();
  1042. var xs = scheduler.CreateHotObservable(
  1043. OnNext(150, 1),
  1044. OnNext(210, 2),
  1045. OnNext(240, 3),
  1046. OnNext(270, 4),
  1047. OnNext(320, 5),
  1048. OnNext(360, 6),
  1049. OnNext(390, 7),
  1050. OnNext(410, 8),
  1051. OnNext(460, 9),
  1052. OnNext(470, 10),
  1053. OnCompleted<int>(490)
  1054. );
  1055. var res = scheduler.Start(() =>
  1056. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  1057. );
  1058. res.Messages.AssertEqual(
  1059. OnNext(210, "0 2"),
  1060. OnNext(240, "0 3"),
  1061. OnNext(270, "0 4"),
  1062. OnNext(300, "0 end"),
  1063. OnNext(320, "1 5"),
  1064. OnNext(360, "1 6"),
  1065. OnNext(390, "1 7"),
  1066. OnNext(400, "1 end"),
  1067. OnNext(410, "2 8"),
  1068. OnNext(460, "2 9"),
  1069. OnNext(470, "2 10"),
  1070. OnNext(490, "2 end"),
  1071. OnCompleted<string>(490)
  1072. );
  1073. xs.Subscriptions.AssertEqual(
  1074. Subscribe(200, 490)
  1075. );
  1076. }
  1077. [Fact]
  1078. public void Window_Time_Basic_Periodic()
  1079. {
  1080. var scheduler = new PeriodicTestScheduler();
  1081. var xs = scheduler.CreateHotObservable(
  1082. OnNext(150, 1),
  1083. OnNext(210, 2),
  1084. OnNext(240, 3),
  1085. OnNext(270, 4),
  1086. OnNext(320, 5),
  1087. OnNext(360, 6),
  1088. OnNext(390, 7),
  1089. OnNext(410, 8),
  1090. OnNext(460, 9),
  1091. OnNext(470, 10),
  1092. OnCompleted<int>(490)
  1093. );
  1094. var res = scheduler.Start(() =>
  1095. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  1096. );
  1097. res.Messages.AssertEqual(
  1098. OnNext(210, "0 2"),
  1099. OnNext(240, "0 3"),
  1100. OnNext(270, "0 4"),
  1101. OnNext(300, "0 end"),
  1102. OnNext(320, "1 5"),
  1103. OnNext(360, "1 6"),
  1104. OnNext(390, "1 7"),
  1105. OnNext(400, "1 end"),
  1106. OnNext(410, "2 8"),
  1107. OnNext(460, "2 9"),
  1108. OnNext(470, "2 10"),
  1109. OnNext(490, "2 end"),
  1110. OnCompleted<string>(490)
  1111. );
  1112. xs.Subscriptions.AssertEqual(
  1113. Subscribe(200, 490)
  1114. );
  1115. #if !WINDOWS
  1116. scheduler.Timers.AssertEqual(
  1117. new TimerRun(200, 490) { 300, 400 }
  1118. );
  1119. #endif
  1120. }
  1121. [Fact]
  1122. public void Window_Time_Basic_Periodic_Error()
  1123. {
  1124. var ex = new Exception();
  1125. var scheduler = new PeriodicTestScheduler();
  1126. var xs = scheduler.CreateHotObservable(
  1127. OnNext(150, 1),
  1128. OnNext(210, 2),
  1129. OnNext(240, 3),
  1130. OnNext(270, 4),
  1131. OnNext(320, 5),
  1132. OnNext(360, 6),
  1133. OnNext(390, 7),
  1134. OnNext(410, 8),
  1135. OnError<int>(460, ex)
  1136. );
  1137. var res = scheduler.Start(() =>
  1138. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  1139. );
  1140. res.Messages.AssertEqual(
  1141. OnNext(210, "0 2"),
  1142. OnNext(240, "0 3"),
  1143. OnNext(270, "0 4"),
  1144. OnNext(300, "0 end"),
  1145. OnNext(320, "1 5"),
  1146. OnNext(360, "1 6"),
  1147. OnNext(390, "1 7"),
  1148. OnNext(400, "1 end"),
  1149. OnNext(410, "2 8"),
  1150. OnError<string>(460, ex)
  1151. );
  1152. xs.Subscriptions.AssertEqual(
  1153. Subscribe(200, 460)
  1154. );
  1155. #if !WINDOWS
  1156. scheduler.Timers.AssertEqual(
  1157. new TimerRun(200, 460) { 300, 400 }
  1158. );
  1159. #endif
  1160. }
  1161. [Fact]
  1162. public void Window_Time_Basic_Both()
  1163. {
  1164. var scheduler = new TestScheduler();
  1165. var xs = scheduler.CreateHotObservable(
  1166. OnNext(150, 1),
  1167. OnNext(210, 2),
  1168. OnNext(240, 3),
  1169. OnNext(270, 4),
  1170. OnNext(320, 5),
  1171. OnNext(360, 6),
  1172. OnNext(390, 7),
  1173. OnNext(410, 8),
  1174. OnNext(460, 9),
  1175. OnNext(470, 10),
  1176. OnCompleted<int>(490)
  1177. );
  1178. var res = scheduler.Start(() =>
  1179. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(50), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  1180. );
  1181. res.Messages.AssertEqual(
  1182. OnNext(210, "0 2"),
  1183. OnNext(240, "0 3"),
  1184. OnNext(270, "0 4"),
  1185. OnNext(270, "1 4"),
  1186. OnNext(300, "0 end"),
  1187. OnNext(320, "1 5"),
  1188. OnNext(320, "2 5"),
  1189. OnNext(350, "1 end"),
  1190. OnNext(360, "2 6"),
  1191. OnNext(360, "3 6"),
  1192. OnNext(390, "2 7"),
  1193. OnNext(390, "3 7"),
  1194. OnNext(400, "2 end"),
  1195. OnNext(410, "3 8"),
  1196. OnNext(410, "4 8"),
  1197. OnNext(450, "3 end"),
  1198. OnNext(460, "4 9"),
  1199. OnNext(460, "5 9"),
  1200. OnNext(470, "4 10"),
  1201. OnNext(470, "5 10"),
  1202. OnNext(490, "4 end"),
  1203. OnNext(490, "5 end"),
  1204. OnCompleted<string>(490)
  1205. );
  1206. xs.Subscriptions.AssertEqual(
  1207. Subscribe(200, 490)
  1208. );
  1209. }
  1210. [Fact]
  1211. public void WindowWithTime_ArgumentChecking()
  1212. {
  1213. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  1214. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  1215. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  1216. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null));
  1217. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1)));
  1218. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1)));
  1219. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1)));
  1220. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  1221. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  1222. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), null));
  1223. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1)));
  1224. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1)));
  1225. }
  1226. [Fact]
  1227. public void WindowWithTime_Basic1()
  1228. {
  1229. var scheduler = new TestScheduler();
  1230. var xs = scheduler.CreateHotObservable(
  1231. OnNext(100, 1),
  1232. OnNext(210, 2),
  1233. OnNext(240, 3),
  1234. OnNext(280, 4),
  1235. OnNext(320, 5),
  1236. OnNext(350, 6),
  1237. OnNext(380, 7),
  1238. OnNext(420, 8),
  1239. OnNext(470, 9),
  1240. OnCompleted<int>(600)
  1241. );
  1242. var res = scheduler.Start(() =>
  1243. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1244. );
  1245. res.Messages.AssertEqual(
  1246. OnNext(210, "0 2"),
  1247. OnNext(240, "0 3"),
  1248. OnNext(280, "0 4"),
  1249. OnNext(280, "1 4"),
  1250. OnNext(320, "1 5"),
  1251. OnNext(350, "1 6"),
  1252. OnNext(350, "2 6"),
  1253. OnNext(380, "2 7"),
  1254. OnNext(420, "2 8"),
  1255. OnNext(420, "3 8"),
  1256. OnNext(470, "3 9"),
  1257. OnCompleted<string>(600)
  1258. );
  1259. xs.Subscriptions.AssertEqual(
  1260. Subscribe(200, 600)
  1261. );
  1262. }
  1263. [Fact]
  1264. public void WindowWithTime_Basic2()
  1265. {
  1266. var scheduler = new TestScheduler();
  1267. var xs = scheduler.CreateHotObservable(
  1268. OnNext(100, 1),
  1269. OnNext(210, 2),
  1270. OnNext(240, 3),
  1271. OnNext(280, 4),
  1272. OnNext(320, 5),
  1273. OnNext(350, 6),
  1274. OnNext(380, 7),
  1275. OnNext(420, 8),
  1276. OnNext(470, 9),
  1277. OnCompleted<int>(600)
  1278. );
  1279. var res = scheduler.Start(() =>
  1280. xs.Window(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1281. );
  1282. res.Messages.AssertEqual(
  1283. OnNext(210, "0 2"),
  1284. OnNext(240, "0 3"),
  1285. OnNext(320, "1 5"),
  1286. OnNext(350, "1 6"),
  1287. OnNext(420, "2 8"),
  1288. OnNext(470, "2 9"),
  1289. OnCompleted<string>(600)
  1290. );
  1291. xs.Subscriptions.AssertEqual(
  1292. Subscribe(200, 600)
  1293. );
  1294. }
  1295. [Fact]
  1296. public void WindowWithTime_Error()
  1297. {
  1298. var scheduler = new TestScheduler();
  1299. var ex = new Exception();
  1300. var xs = scheduler.CreateHotObservable(
  1301. OnNext(100, 1),
  1302. OnNext(210, 2),
  1303. OnNext(240, 3),
  1304. OnNext(280, 4),
  1305. OnNext(320, 5),
  1306. OnNext(350, 6),
  1307. OnNext(380, 7),
  1308. OnNext(420, 8),
  1309. OnNext(470, 9),
  1310. OnError<int>(600, ex)
  1311. );
  1312. var res = scheduler.Start(() =>
  1313. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1314. );
  1315. res.Messages.AssertEqual(
  1316. OnNext(210, "0 2"),
  1317. OnNext(240, "0 3"),
  1318. OnNext(280, "0 4"),
  1319. OnNext(280, "1 4"),
  1320. OnNext(320, "1 5"),
  1321. OnNext(350, "1 6"),
  1322. OnNext(350, "2 6"),
  1323. OnNext(380, "2 7"),
  1324. OnNext(420, "2 8"),
  1325. OnNext(420, "3 8"),
  1326. OnNext(470, "3 9"),
  1327. OnError<string>(600, ex)
  1328. );
  1329. xs.Subscriptions.AssertEqual(
  1330. Subscribe(200, 600)
  1331. );
  1332. }
  1333. [Fact]
  1334. public void WindowWithTime_Disposed()
  1335. {
  1336. var scheduler = new TestScheduler();
  1337. var xs = scheduler.CreateHotObservable(
  1338. OnNext(100, 1),
  1339. OnNext(210, 2),
  1340. OnNext(240, 3),
  1341. OnNext(280, 4),
  1342. OnNext(320, 5),
  1343. OnNext(350, 6),
  1344. OnNext(380, 7),
  1345. OnNext(420, 8),
  1346. OnNext(470, 9),
  1347. OnCompleted<int>(600)
  1348. );
  1349. var res = scheduler.Start(() =>
  1350. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  1351. 370
  1352. );
  1353. res.Messages.AssertEqual(
  1354. OnNext(210, "0 2"),
  1355. OnNext(240, "0 3"),
  1356. OnNext(280, "0 4"),
  1357. OnNext(280, "1 4"),
  1358. OnNext(320, "1 5"),
  1359. OnNext(350, "1 6"),
  1360. OnNext(350, "2 6")
  1361. );
  1362. xs.Subscriptions.AssertEqual(
  1363. Subscribe(200, 370)
  1364. );
  1365. }
  1366. [Fact]
  1367. public void WindowWithTime_Basic_Same()
  1368. {
  1369. var scheduler = new TestScheduler();
  1370. var xs = scheduler.CreateHotObservable(
  1371. OnNext(100, 1),
  1372. OnNext(210, 2),
  1373. OnNext(240, 3),
  1374. OnNext(280, 4),
  1375. OnNext(320, 5),
  1376. OnNext(350, 6),
  1377. OnNext(380, 7),
  1378. OnNext(420, 8),
  1379. OnNext(470, 9),
  1380. OnCompleted<int>(600)
  1381. );
  1382. var res = scheduler.Start(() =>
  1383. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1384. );
  1385. res.Messages.AssertEqual(
  1386. OnNext(210, "0 2"),
  1387. OnNext(240, "0 3"),
  1388. OnNext(280, "0 4"),
  1389. OnNext(320, "1 5"),
  1390. OnNext(350, "1 6"),
  1391. OnNext(380, "1 7"),
  1392. OnNext(420, "2 8"),
  1393. OnNext(470, "2 9"),
  1394. OnCompleted<string>(600)
  1395. );
  1396. xs.Subscriptions.AssertEqual(
  1397. Subscribe(200, 600)
  1398. );
  1399. }
  1400. [Fact]
  1401. public void WindowWithTime_Default()
  1402. {
  1403. Observable.Range(0, 10).Window(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).SelectMany(Observable.ToList).First().AssertEqual(Enumerable.Range(0, 10));
  1404. Observable.Range(0, 10).Window(TimeSpan.FromDays(1)).SelectMany(Observable.ToList).First().AssertEqual(Enumerable.Range(0, 10));
  1405. }
  1406. [Fact]
  1407. public void WindowWithTimeOrCount_ArgumentChecking()
  1408. {
  1409. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance));
  1410. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance));
  1411. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance));
  1412. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 1, null));
  1413. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), 1));
  1414. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1));
  1415. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0));
  1416. }
  1417. [Fact]
  1418. public void WindowWithTimeOrCount_Basic()
  1419. {
  1420. var scheduler = new TestScheduler();
  1421. var xs = scheduler.CreateHotObservable(
  1422. OnNext(205, 1),
  1423. OnNext(210, 2),
  1424. OnNext(240, 3),
  1425. OnNext(280, 4),
  1426. OnNext(320, 5),
  1427. OnNext(350, 6),
  1428. OnNext(370, 7),
  1429. OnNext(420, 8),
  1430. OnNext(470, 9),
  1431. OnCompleted<int>(600)
  1432. );
  1433. var res = scheduler.Start(() =>
  1434. xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1435. );
  1436. res.Messages.AssertEqual(
  1437. OnNext(205, "0 1"),
  1438. OnNext(210, "0 2"),
  1439. OnNext(240, "0 3"),
  1440. OnNext(280, "1 4"),
  1441. OnNext(320, "2 5"),
  1442. OnNext(350, "2 6"),
  1443. OnNext(370, "2 7"),
  1444. OnNext(420, "3 8"),
  1445. OnNext(470, "4 9"),
  1446. OnCompleted<string>(600)
  1447. );
  1448. xs.Subscriptions.AssertEqual(
  1449. Subscribe(200, 600)
  1450. );
  1451. }
  1452. [Fact]
  1453. public void WindowWithTimeOrCount_Error()
  1454. {
  1455. var scheduler = new TestScheduler();
  1456. var ex = new Exception();
  1457. var xs = scheduler.CreateHotObservable(
  1458. OnNext(205, 1),
  1459. OnNext(210, 2),
  1460. OnNext(240, 3),
  1461. OnNext(280, 4),
  1462. OnNext(320, 5),
  1463. OnNext(350, 6),
  1464. OnNext(370, 7),
  1465. OnNext(420, 8),
  1466. OnNext(470, 9),
  1467. OnError<int>(600, ex)
  1468. );
  1469. var res = scheduler.Start(() =>
  1470. xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  1471. );
  1472. res.Messages.AssertEqual(
  1473. OnNext(205, "0 1"),
  1474. OnNext(210, "0 2"),
  1475. OnNext(240, "0 3"),
  1476. OnNext(280, "1 4"),
  1477. OnNext(320, "2 5"),
  1478. OnNext(350, "2 6"),
  1479. OnNext(370, "2 7"),
  1480. OnNext(420, "3 8"),
  1481. OnNext(470, "4 9"),
  1482. OnError<string>(600, ex)
  1483. );
  1484. xs.Subscriptions.AssertEqual(
  1485. Subscribe(200, 600)
  1486. );
  1487. }
  1488. [Fact]
  1489. public void WindowWithTimeOrCount_Disposed()
  1490. {
  1491. var scheduler = new TestScheduler();
  1492. var xs = scheduler.CreateHotObservable(
  1493. OnNext(205, 1),
  1494. OnNext(210, 2),
  1495. OnNext(240, 3),
  1496. OnNext(280, 4),
  1497. OnNext(320, 5),
  1498. OnNext(350, 6),
  1499. OnNext(370, 7),
  1500. OnNext(420, 8),
  1501. OnNext(470, 9),
  1502. OnCompleted<int>(600)
  1503. );
  1504. var res = scheduler.Start(() =>
  1505. xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  1506. 370
  1507. );
  1508. res.Messages.AssertEqual(
  1509. OnNext(205, "0 1"),
  1510. OnNext(210, "0 2"),
  1511. OnNext(240, "0 3"),
  1512. OnNext(280, "1 4"),
  1513. OnNext(320, "2 5"),
  1514. OnNext(350, "2 6"),
  1515. OnNext(370, "2 7")
  1516. );
  1517. xs.Subscriptions.AssertEqual(
  1518. Subscribe(200, 370)
  1519. );
  1520. }
  1521. [Fact]
  1522. public void WindowWithTimeOrCount_Default()
  1523. {
  1524. Observable.Range(1, 10).Window(TimeSpan.FromDays(1), 3).Skip(1).First().SequenceEqual(Observable.Range(4, 3));
  1525. }
  1526. #endregion
  1527. }
  1528. }