BufferTest.cs 44 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. namespace ReactiveTests.Tests
  13. {
  14. [TestClass]
  15. public class BufferTest : ReactiveTest
  16. {
  17. #region + Boundary +
  18. [TestMethod]
  19. public void Buffer_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyFunc<IObservable<int>>.Instance));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(Func<IObservable<int>>)));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default, DummyFunc<int, IObservable<int>>.Instance));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>)));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(IObservable<int>)));
  28. }
  29. [TestMethod]
  30. public void Buffer_Closings_Basic()
  31. {
  32. var scheduler = new TestScheduler();
  33. var xs = scheduler.CreateHotObservable(
  34. OnNext(90, 1),
  35. OnNext(180, 2),
  36. OnNext(250, 3),
  37. OnNext(260, 4),
  38. OnNext(310, 5),
  39. OnNext(340, 6),
  40. OnNext(410, 7),
  41. OnNext(420, 8),
  42. OnNext(470, 9),
  43. OnNext(550, 10),
  44. OnCompleted<int>(590)
  45. );
  46. var window = 1;
  47. var res = scheduler.Start(() =>
  48. xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))
  49. );
  50. res.Messages.AssertEqual(
  51. OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),
  52. OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
  53. OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
  54. OnCompleted<IList<int>>(590)
  55. );
  56. xs.Subscriptions.AssertEqual(
  57. Subscribe(200, 590)
  58. );
  59. }
  60. [TestMethod]
  61. public void Buffer_Closings_InnerSubscriptions()
  62. {
  63. var scheduler = new TestScheduler();
  64. var xs = scheduler.CreateHotObservable(
  65. OnNext(90, 1),
  66. OnNext(180, 2),
  67. OnNext(250, 3),
  68. OnNext(260, 4),
  69. OnNext(310, 5),
  70. OnNext(340, 6),
  71. OnNext(410, 7),
  72. OnNext(420, 8),
  73. OnNext(470, 9),
  74. OnNext(550, 10),
  75. OnCompleted<int>(590)
  76. );
  77. var closings = new ITestableObservable<bool>[] {
  78. scheduler.CreateHotObservable(
  79. OnNext(300, true),
  80. OnNext(350, false),
  81. OnCompleted<bool>(380)
  82. ),
  83. scheduler.CreateHotObservable(
  84. OnNext(400, true),
  85. OnNext(510, false),
  86. OnNext(620, false)
  87. ),
  88. scheduler.CreateHotObservable(
  89. OnCompleted<bool>(500)
  90. ),
  91. scheduler.CreateHotObservable(
  92. OnNext(600, true)
  93. )
  94. };
  95. var window = 0;
  96. var res = scheduler.Start(() =>
  97. xs.Buffer(() => closings[window++])
  98. );
  99. res.Messages.AssertEqual(
  100. OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),
  101. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { 5, 6 })),
  102. OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
  103. OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
  104. OnCompleted<IList<int>>(590)
  105. );
  106. xs.Subscriptions.AssertEqual(
  107. Subscribe(200, 590)
  108. );
  109. closings[0].Subscriptions.AssertEqual(
  110. Subscribe(200, 300)
  111. );
  112. closings[1].Subscriptions.AssertEqual(
  113. Subscribe(300, 400)
  114. );
  115. closings[2].Subscriptions.AssertEqual(
  116. Subscribe(400, 500)
  117. );
  118. closings[3].Subscriptions.AssertEqual(
  119. Subscribe(500, 590)
  120. );
  121. }
  122. [TestMethod]
  123. public void Buffer_Closings_Empty()
  124. {
  125. var scheduler = new TestScheduler();
  126. var xs = scheduler.CreateHotObservable(
  127. OnNext(90, 1),
  128. OnNext(180, 2),
  129. OnNext(250, 3),
  130. OnNext(260, 4),
  131. OnNext(310, 5),
  132. OnNext(340, 6),
  133. OnNext(410, 7),
  134. OnNext(420, 8),
  135. OnNext(470, 9),
  136. OnNext(550, 10),
  137. OnCompleted<int>(590)
  138. );
  139. var window = 1;
  140. var res = scheduler.Start(() =>
  141. xs.Buffer(() => Observable.Empty<int>().Delay(TimeSpan.FromTicks((window++) * 100), scheduler))
  142. );
  143. res.Messages.AssertEqual(
  144. OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),
  145. OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
  146. OnNext<IList<int>>(590, l => l.SequenceEqual(new int[] { 10 })),
  147. OnCompleted<IList<int>>(590)
  148. );
  149. xs.Subscriptions.AssertEqual(
  150. Subscribe(200, 590)
  151. );
  152. }
  153. [TestMethod]
  154. public void Buffer_Closings_Dispose()
  155. {
  156. var scheduler = new TestScheduler();
  157. var xs = scheduler.CreateHotObservable(
  158. OnNext(90, 1),
  159. OnNext(180, 2),
  160. OnNext(250, 3),
  161. OnNext(260, 4),
  162. OnNext(310, 5),
  163. OnNext(340, 6),
  164. OnNext(410, 7),
  165. OnNext(420, 8),
  166. OnNext(470, 9),
  167. OnNext(550, 10),
  168. OnCompleted<int>(590)
  169. );
  170. var window = 1;
  171. var res = scheduler.Start(() =>
  172. xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)),
  173. 400
  174. );
  175. res.Messages.AssertEqual(
  176. OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 }))
  177. );
  178. xs.Subscriptions.AssertEqual(
  179. Subscribe(200, 400)
  180. );
  181. }
  182. [TestMethod]
  183. public void Buffer_Closings_Error()
  184. {
  185. var scheduler = new TestScheduler();
  186. var ex = new Exception();
  187. var xs = scheduler.CreateHotObservable(
  188. OnNext(90, 1),
  189. OnNext(180, 2),
  190. OnNext(250, 3),
  191. OnNext(260, 4),
  192. OnNext(310, 5),
  193. OnNext(340, 6),
  194. OnNext(410, 7),
  195. OnNext(420, 8),
  196. OnNext(470, 9),
  197. OnNext(550, 10),
  198. OnError<int>(590, ex)
  199. );
  200. var window = 1;
  201. var res = scheduler.Start(() =>
  202. xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))
  203. );
  204. res.Messages.AssertEqual(
  205. OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),
  206. OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
  207. OnError<IList<int>>(590, ex)
  208. );
  209. xs.Subscriptions.AssertEqual(
  210. Subscribe(200, 590)
  211. );
  212. }
  213. [TestMethod]
  214. public void Buffer_Closings_Throw()
  215. {
  216. var scheduler = new TestScheduler();
  217. var ex = new Exception();
  218. var xs = scheduler.CreateHotObservable(
  219. OnNext(90, 1),
  220. OnNext(180, 2),
  221. OnNext(250, 3),
  222. OnNext(260, 4),
  223. OnNext(310, 5),
  224. OnNext(340, 6),
  225. OnNext(410, 7),
  226. OnNext(420, 8),
  227. OnNext(470, 9),
  228. OnNext(550, 10),
  229. OnError<int>(590, new Exception())
  230. );
  231. var res = scheduler.Start(() =>
  232. xs.Buffer<int, int>(() => { throw ex; })
  233. );
  234. res.Messages.AssertEqual(
  235. OnError<IList<int>>(200, ex)
  236. );
  237. xs.Subscriptions.AssertEqual(
  238. Subscribe(200, 200)
  239. );
  240. }
  241. [TestMethod]
  242. public void Buffer_Closings_WindowClose_Error()
  243. {
  244. var scheduler = new TestScheduler();
  245. var ex = new Exception();
  246. var xs = scheduler.CreateHotObservable(
  247. OnNext(90, 1),
  248. OnNext(180, 2),
  249. OnNext(250, 3),
  250. OnNext(260, 4),
  251. OnNext(310, 5),
  252. OnNext(340, 6),
  253. OnNext(410, 7),
  254. OnNext(420, 8),
  255. OnNext(470, 9),
  256. OnNext(550, 10),
  257. OnError<int>(590, new Exception())
  258. );
  259. var res = scheduler.Start(() =>
  260. xs.Buffer(() => Observable.Throw<int>(ex, scheduler))
  261. );
  262. res.Messages.AssertEqual(
  263. OnError<IList<int>>(201, ex)
  264. );
  265. xs.Subscriptions.AssertEqual(
  266. Subscribe(200, 201)
  267. );
  268. }
  269. [TestMethod]
  270. public void Buffer_OpeningClosings_Basic()
  271. {
  272. var scheduler = new TestScheduler();
  273. var xs = scheduler.CreateHotObservable(
  274. OnNext(90, 1),
  275. OnNext(180, 2),
  276. OnNext(250, 3),
  277. OnNext(260, 4),
  278. OnNext(310, 5),
  279. OnNext(340, 6),
  280. OnNext(410, 7),
  281. OnNext(420, 8),
  282. OnNext(470, 9),
  283. OnNext(550, 10),
  284. OnCompleted<int>(590)
  285. );
  286. var ys = scheduler.CreateHotObservable(
  287. OnNext(255, 50),
  288. OnNext(330, 100),
  289. OnNext(350, 50),
  290. OnNext(400, 90),
  291. OnCompleted<int>(900)
  292. );
  293. var res = scheduler.Start(() =>
  294. xs.Buffer(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler))
  295. );
  296. res.Messages.AssertEqual(
  297. OnNext<IList<int>>(305, b => b.SequenceEqual(new int[] { 4 })),
  298. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
  299. OnNext<IList<int>>(430, b => b.SequenceEqual(new int[] { 6, 7, 8 })),
  300. OnNext<IList<int>>(490, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
  301. OnCompleted<IList<int>>(900)
  302. );
  303. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  304. xs.Subscriptions.AssertEqual(
  305. Subscribe(200, 590)
  306. );
  307. #else
  308. xs.Subscriptions.AssertEqual(
  309. Subscribe(200, 900)
  310. );
  311. #endif
  312. ys.Subscriptions.AssertEqual(
  313. Subscribe(200, 900)
  314. );
  315. }
  316. [TestMethod]
  317. public void Buffer_Boundaries_Simple()
  318. {
  319. var scheduler = new TestScheduler();
  320. var xs = scheduler.CreateHotObservable(
  321. OnNext(90, 1),
  322. OnNext(180, 2),
  323. OnNext(250, 3),
  324. OnNext(260, 4),
  325. OnNext(310, 5),
  326. OnNext(340, 6),
  327. OnNext(410, 7),
  328. OnNext(420, 8),
  329. OnNext(470, 9),
  330. OnNext(550, 10),
  331. OnCompleted<int>(590)
  332. );
  333. var ys = scheduler.CreateHotObservable(
  334. OnNext(255, true),
  335. OnNext(330, true),
  336. OnNext(350, true),
  337. OnNext(400, true),
  338. OnNext(500, true),
  339. OnCompleted<bool>(900)
  340. );
  341. var res = scheduler.Start(() =>
  342. xs.Buffer(ys)
  343. );
  344. res.Messages.AssertEqual(
  345. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  346. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  347. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  348. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
  349. OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
  350. OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
  351. OnCompleted<IList<int>>(590)
  352. );
  353. xs.Subscriptions.AssertEqual(
  354. Subscribe(200, 590)
  355. );
  356. ys.Subscriptions.AssertEqual(
  357. Subscribe(200, 590)
  358. );
  359. }
  360. [TestMethod]
  361. public void Buffer_Boundaries_OnCompletedBoundaries()
  362. {
  363. var scheduler = new TestScheduler();
  364. var xs = scheduler.CreateHotObservable(
  365. OnNext(90, 1),
  366. OnNext(180, 2),
  367. OnNext(250, 3),
  368. OnNext(260, 4),
  369. OnNext(310, 5),
  370. OnNext(340, 6),
  371. OnNext(410, 7),
  372. OnNext(420, 8),
  373. OnNext(470, 9),
  374. OnNext(550, 10),
  375. OnCompleted<int>(590)
  376. );
  377. var ys = scheduler.CreateHotObservable(
  378. OnNext(255, true),
  379. OnNext(330, true),
  380. OnNext(350, true),
  381. OnCompleted<bool>(400)
  382. );
  383. var res = scheduler.Start(() =>
  384. xs.Buffer(ys)
  385. );
  386. res.Messages.AssertEqual(
  387. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  388. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  389. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  390. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
  391. OnCompleted<IList<int>>(400)
  392. );
  393. xs.Subscriptions.AssertEqual(
  394. Subscribe(200, 400)
  395. );
  396. ys.Subscriptions.AssertEqual(
  397. Subscribe(200, 400)
  398. );
  399. }
  400. [TestMethod]
  401. public void Buffer_Boundaries_OnErrorSource()
  402. {
  403. var ex = new Exception();
  404. var scheduler = new TestScheduler();
  405. var xs = scheduler.CreateHotObservable(
  406. OnNext(90, 1),
  407. OnNext(180, 2),
  408. OnNext(250, 3),
  409. OnNext(260, 4),
  410. OnNext(310, 5),
  411. OnNext(340, 6),
  412. OnNext(380, 7),
  413. OnError<int>(400, ex)
  414. );
  415. var ys = scheduler.CreateHotObservable(
  416. OnNext(255, true),
  417. OnNext(330, true),
  418. OnNext(350, true),
  419. OnCompleted<bool>(500)
  420. );
  421. var res = scheduler.Start(() =>
  422. xs.Buffer(ys)
  423. );
  424. res.Messages.AssertEqual(
  425. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  426. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  427. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  428. OnError<IList<int>>(400, ex)
  429. );
  430. xs.Subscriptions.AssertEqual(
  431. Subscribe(200, 400)
  432. );
  433. ys.Subscriptions.AssertEqual(
  434. Subscribe(200, 400)
  435. );
  436. }
  437. [TestMethod]
  438. public void Buffer_Boundaries_OnErrorBoundaries()
  439. {
  440. var ex = new Exception();
  441. var scheduler = new TestScheduler();
  442. var xs = scheduler.CreateHotObservable(
  443. OnNext(90, 1),
  444. OnNext(180, 2),
  445. OnNext(250, 3),
  446. OnNext(260, 4),
  447. OnNext(310, 5),
  448. OnNext(340, 6),
  449. OnNext(410, 7),
  450. OnNext(420, 8),
  451. OnNext(470, 9),
  452. OnNext(550, 10),
  453. OnCompleted<int>(590)
  454. );
  455. var ys = scheduler.CreateHotObservable(
  456. OnNext(255, true),
  457. OnNext(330, true),
  458. OnNext(350, true),
  459. OnError<bool>(400, ex)
  460. );
  461. var res = scheduler.Start(() =>
  462. xs.Buffer(ys)
  463. );
  464. res.Messages.AssertEqual(
  465. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  466. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  467. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  468. OnError<IList<int>>(400, ex)
  469. );
  470. xs.Subscriptions.AssertEqual(
  471. Subscribe(200, 400)
  472. );
  473. ys.Subscriptions.AssertEqual(
  474. Subscribe(200, 400)
  475. );
  476. }
  477. #endregion
  478. #region + Count +
  479. [TestMethod]
  480. public void Buffer_Single_ArgumentChecking()
  481. {
  482. var someObservable = Observable.Empty<int>();
  483. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  484. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0));
  485. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, -1));
  486. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  487. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 1, 0));
  488. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0, 1));
  489. }
  490. [TestMethod]
  491. public void Buffer_Count_PartialWindow()
  492. {
  493. var scheduler = new TestScheduler();
  494. var xs = scheduler.CreateHotObservable(
  495. OnNext(150, 1),
  496. OnNext(210, 2),
  497. OnNext(220, 3),
  498. OnNext(230, 4),
  499. OnNext(240, 5),
  500. OnCompleted<int>(250)
  501. );
  502. var res = scheduler.Start(() =>
  503. xs.Buffer(5)
  504. );
  505. res.Messages.AssertEqual(
  506. OnNext<IList<int>>(250, l => l.SequenceEqual(new[] { 2, 3, 4, 5 })),
  507. OnCompleted<IList<int>>(250)
  508. );
  509. xs.Subscriptions.AssertEqual(
  510. Subscribe(200, 250)
  511. );
  512. }
  513. [TestMethod]
  514. public void Buffer_Count_FullWindows()
  515. {
  516. var scheduler = new TestScheduler();
  517. var xs = scheduler.CreateHotObservable(
  518. OnNext(150, 1),
  519. OnNext(210, 2),
  520. OnNext(220, 3),
  521. OnNext(230, 4),
  522. OnNext(240, 5),
  523. OnCompleted<int>(250)
  524. );
  525. var res = scheduler.Start(() =>
  526. xs.Buffer(2)
  527. );
  528. res.Messages.AssertEqual(
  529. OnNext<IList<int>>(220, l => l.SequenceEqual(new[] { 2, 3 })),
  530. OnNext<IList<int>>(240, l => l.SequenceEqual(new[] { 4, 5 })),
  531. OnCompleted<IList<int>>(250)
  532. );
  533. xs.Subscriptions.AssertEqual(
  534. Subscribe(200, 250)
  535. );
  536. }
  537. [TestMethod]
  538. public void Buffer_Count_FullAndPartialWindows()
  539. {
  540. var scheduler = new TestScheduler();
  541. var xs = scheduler.CreateHotObservable(
  542. OnNext(150, 1),
  543. OnNext(210, 2),
  544. OnNext(220, 3),
  545. OnNext(230, 4),
  546. OnNext(240, 5),
  547. OnCompleted<int>(250)
  548. );
  549. var res = scheduler.Start(() =>
  550. xs.Buffer(3)
  551. );
  552. res.Messages.AssertEqual(
  553. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  554. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  555. OnCompleted<IList<int>>(250)
  556. );
  557. xs.Subscriptions.AssertEqual(
  558. Subscribe(200, 250)
  559. );
  560. }
  561. [TestMethod]
  562. public void Buffer_Count_Error()
  563. {
  564. var scheduler = new TestScheduler();
  565. var ex = new Exception();
  566. var xs = scheduler.CreateHotObservable(
  567. OnNext(150, 1),
  568. OnNext(210, 2),
  569. OnNext(220, 3),
  570. OnNext(230, 4),
  571. OnNext(240, 5),
  572. OnError<int>(250, ex)
  573. );
  574. var res = scheduler.Start(() =>
  575. xs.Buffer(5)
  576. );
  577. res.Messages.AssertEqual(
  578. OnError<IList<int>>(250, ex)
  579. );
  580. xs.Subscriptions.AssertEqual(
  581. Subscribe(200, 250)
  582. );
  583. }
  584. [TestMethod]
  585. public void Buffer_Count_Skip_Less()
  586. {
  587. var scheduler = new TestScheduler();
  588. var xs = scheduler.CreateHotObservable(
  589. OnNext(150, 1),
  590. OnNext(210, 2),
  591. OnNext(220, 3),
  592. OnNext(230, 4),
  593. OnNext(240, 5),
  594. OnCompleted<int>(250)
  595. );
  596. var res = scheduler.Start(() =>
  597. xs.Buffer(3, 1)
  598. );
  599. res.Messages.AssertEqual(
  600. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  601. OnNext<IList<int>>(240, l => l.SequenceEqual(new int[] { 3, 4, 5 })),
  602. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 4, 5 })),
  603. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  604. OnCompleted<IList<int>>(250)
  605. );
  606. xs.Subscriptions.AssertEqual(
  607. Subscribe(200, 250)
  608. );
  609. }
  610. [TestMethod]
  611. public void Buffer_Count_Skip_More()
  612. {
  613. var scheduler = new TestScheduler();
  614. var xs = scheduler.CreateHotObservable(
  615. OnNext(150, 1),
  616. OnNext(210, 2),
  617. OnNext(220, 3),
  618. OnNext(230, 4),
  619. OnNext(240, 5),
  620. OnCompleted<int>(250)
  621. );
  622. var res = scheduler.Start(() =>
  623. xs.Buffer(2, 3)
  624. );
  625. res.Messages.AssertEqual(
  626. OnNext<IList<int>>(220, l => l.SequenceEqual(new int[] { 2, 3 })),
  627. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  628. OnCompleted<IList<int>>(250)
  629. );
  630. xs.Subscriptions.AssertEqual(
  631. Subscribe(200, 250)
  632. );
  633. }
  634. [TestMethod]
  635. public void BufferWithCount_ArgumentChecking()
  636. {
  637. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  638. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0, 1));
  639. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 1, 0));
  640. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  641. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0));
  642. }
  643. [TestMethod]
  644. public void BufferWithCount_Basic()
  645. {
  646. var scheduler = new TestScheduler();
  647. var xs = scheduler.CreateHotObservable(
  648. OnNext(100, 1),
  649. OnNext(210, 2),
  650. OnNext(240, 3),
  651. OnNext(280, 4),
  652. OnNext(320, 5),
  653. OnNext(350, 6),
  654. OnNext(380, 7),
  655. OnNext(420, 8),
  656. OnNext(470, 9),
  657. OnCompleted<int>(600)
  658. );
  659. var res = scheduler.Start(() =>
  660. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  661. );
  662. res.Messages.AssertEqual(
  663. OnNext(280, "2,3,4"),
  664. OnNext(350, "4,5,6"),
  665. OnNext(420, "6,7,8"),
  666. OnNext(600, "8,9"),
  667. OnCompleted<string>(600)
  668. );
  669. xs.Subscriptions.AssertEqual(
  670. Subscribe(200, 600)
  671. );
  672. }
  673. [TestMethod]
  674. public void BufferWithCount_Disposed()
  675. {
  676. var scheduler = new TestScheduler();
  677. var xs = scheduler.CreateHotObservable(
  678. OnNext(100, 1),
  679. OnNext(210, 2),
  680. OnNext(240, 3),
  681. OnNext(280, 4),
  682. OnNext(320, 5),
  683. OnNext(350, 6),
  684. OnNext(380, 7),
  685. OnNext(420, 8),
  686. OnNext(470, 9),
  687. OnCompleted<int>(600)
  688. );
  689. var res = scheduler.Start(() =>
  690. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370
  691. );
  692. res.Messages.AssertEqual(
  693. OnNext(280, "2,3,4"),
  694. OnNext(350, "4,5,6")
  695. );
  696. xs.Subscriptions.AssertEqual(
  697. Subscribe(200, 370)
  698. );
  699. }
  700. [TestMethod]
  701. public void BufferWithCount_Error()
  702. {
  703. var scheduler = new TestScheduler();
  704. var ex = new Exception();
  705. var xs = scheduler.CreateHotObservable(
  706. OnNext(100, 1),
  707. OnNext(210, 2),
  708. OnNext(240, 3),
  709. OnNext(280, 4),
  710. OnNext(320, 5),
  711. OnNext(350, 6),
  712. OnNext(380, 7),
  713. OnNext(420, 8),
  714. OnNext(470, 9),
  715. OnError<int>(600, ex)
  716. );
  717. var res = scheduler.Start(() =>
  718. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  719. );
  720. res.Messages.AssertEqual(
  721. OnNext(280, "2,3,4"),
  722. OnNext(350, "4,5,6"),
  723. OnNext(420, "6,7,8"),
  724. OnError<string>(600, ex)
  725. );
  726. xs.Subscriptions.AssertEqual(
  727. Subscribe(200, 600)
  728. );
  729. }
  730. [TestMethod]
  731. public void BufferWithCount_Default()
  732. {
  733. Observable.Range(1, 10).Buffer(3).Skip(1).First().AssertEqual(4, 5, 6);
  734. Observable.Range(1, 10).Buffer(3, 2).Skip(1).First().AssertEqual(3, 4, 5);
  735. }
  736. #endregion
  737. #region + Time +
  738. [TestMethod]
  739. public void Buffer_Time_ArgumentChecking()
  740. {
  741. var scheduler = new TestScheduler();
  742. var someObservable = Observable.Empty<int>();
  743. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero));
  744. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, null));
  745. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, scheduler));
  746. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero));
  747. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, TimeSpan.Zero, null));
  748. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero, scheduler));
  749. }
  750. [TestMethod]
  751. public void BufferWithTime_ArgumentChecking()
  752. {
  753. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  754. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  755. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  756. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null));
  757. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1)));
  758. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1)));
  759. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1)));
  760. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  761. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  762. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), null));
  763. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1)));
  764. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1)));
  765. }
  766. [TestMethod]
  767. public void BufferWithTime_Basic1()
  768. {
  769. var scheduler = new TestScheduler();
  770. var xs = scheduler.CreateHotObservable(
  771. OnNext(100, 1),
  772. OnNext(210, 2),
  773. OnNext(240, 3),
  774. OnNext(280, 4),
  775. OnNext(320, 5),
  776. OnNext(350, 6),
  777. OnNext(380, 7),
  778. OnNext(420, 8),
  779. OnNext(470, 9),
  780. OnCompleted<int>(600)
  781. );
  782. var res = scheduler.Start(() =>
  783. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  784. );
  785. res.Messages.AssertEqual(
  786. OnNext(300, "2,3,4"),
  787. OnNext(370, "4,5,6"),
  788. OnNext(440, "6,7,8"),
  789. OnNext(510, "8,9"),
  790. OnNext(580, ""),
  791. OnNext(600, ""),
  792. OnCompleted<string>(600)
  793. );
  794. xs.Subscriptions.AssertEqual(
  795. Subscribe(200, 600)
  796. );
  797. }
  798. [TestMethod]
  799. public void BufferWithTime_Basic2()
  800. {
  801. var scheduler = new TestScheduler();
  802. var xs = scheduler.CreateHotObservable(
  803. OnNext(100, 1),
  804. OnNext(210, 2),
  805. OnNext(240, 3),
  806. OnNext(280, 4),
  807. OnNext(320, 5),
  808. OnNext(350, 6),
  809. OnNext(380, 7),
  810. OnNext(420, 8),
  811. OnNext(470, 9),
  812. OnCompleted<int>(600)
  813. );
  814. var res = scheduler.Start(() =>
  815. xs.Buffer(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  816. );
  817. res.Messages.AssertEqual(
  818. OnNext(270, "2,3"),
  819. OnNext(370, "5,6"),
  820. OnNext(470, "8,9"),
  821. OnNext(570, ""),
  822. OnCompleted<string>(600)
  823. );
  824. xs.Subscriptions.AssertEqual(
  825. Subscribe(200, 600)
  826. );
  827. }
  828. [TestMethod]
  829. public void BufferWithTime_Error()
  830. {
  831. var scheduler = new TestScheduler();
  832. var ex = new Exception();
  833. var xs = scheduler.CreateHotObservable(
  834. OnNext(100, 1),
  835. OnNext(210, 2),
  836. OnNext(240, 3),
  837. OnNext(280, 4),
  838. OnNext(320, 5),
  839. OnNext(350, 6),
  840. OnNext(380, 7),
  841. OnNext(420, 8),
  842. OnNext(470, 9),
  843. OnError<int>(600, ex)
  844. );
  845. var res = scheduler.Start(() =>
  846. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  847. );
  848. res.Messages.AssertEqual(
  849. OnNext(300, "2,3,4"),
  850. OnNext(370, "4,5,6"),
  851. OnNext(440, "6,7,8"),
  852. OnNext(510, "8,9"),
  853. OnNext(580, ""),
  854. OnError<string>(600, ex)
  855. );
  856. xs.Subscriptions.AssertEqual(
  857. Subscribe(200, 600)
  858. );
  859. }
  860. [TestMethod]
  861. public void BufferWithTime_Disposed()
  862. {
  863. var scheduler = new TestScheduler();
  864. var xs = scheduler.CreateHotObservable(
  865. OnNext(100, 1),
  866. OnNext(210, 2),
  867. OnNext(240, 3),
  868. OnNext(280, 4),
  869. OnNext(320, 5),
  870. OnNext(350, 6),
  871. OnNext(380, 7),
  872. OnNext(420, 8),
  873. OnNext(470, 9),
  874. OnCompleted<int>(600)
  875. );
  876. var res = scheduler.Start(() =>
  877. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
  878. 370
  879. );
  880. res.Messages.AssertEqual(
  881. OnNext(300, "2,3,4")
  882. );
  883. xs.Subscriptions.AssertEqual(
  884. Subscribe(200, 370)
  885. );
  886. }
  887. [TestMethod]
  888. public void BufferWithTime_Basic_Same()
  889. {
  890. var scheduler = new TestScheduler();
  891. var xs = scheduler.CreateHotObservable(
  892. OnNext(100, 1),
  893. OnNext(210, 2),
  894. OnNext(240, 3),
  895. OnNext(280, 4),
  896. OnNext(320, 5),
  897. OnNext(350, 6),
  898. OnNext(380, 7),
  899. OnNext(420, 8),
  900. OnNext(470, 9),
  901. OnCompleted<int>(600)
  902. );
  903. var res = scheduler.Start(() =>
  904. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  905. );
  906. res.Messages.AssertEqual(
  907. OnNext(300, "2,3,4"),
  908. OnNext(400, "5,6,7"),
  909. OnNext(500, "8,9"),
  910. OnNext(600, ""),
  911. OnCompleted<string>(600)
  912. );
  913. xs.Subscriptions.AssertEqual(
  914. Subscribe(200, 600)
  915. );
  916. }
  917. [TestMethod]
  918. public void BufferWithTime_Basic_Same_Periodic()
  919. {
  920. var scheduler = new PeriodicTestScheduler();
  921. var xs = scheduler.CreateHotObservable(
  922. OnNext(100, 1),
  923. OnNext(210, 2),
  924. OnNext(240, 3),
  925. OnNext(280, 4),
  926. OnNext(320, 5),
  927. OnNext(350, 6),
  928. OnNext(380, 7),
  929. OnNext(420, 8),
  930. OnNext(470, 9),
  931. OnCompleted<int>(600)
  932. );
  933. var res = scheduler.Start(() =>
  934. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  935. );
  936. res.Messages.AssertEqual(
  937. OnNext(300, "2,3,4"),
  938. OnNext(400, "5,6,7"),
  939. OnNext(500, "8,9"),
  940. OnNext(600, ""),
  941. OnCompleted<string>(600)
  942. );
  943. xs.Subscriptions.AssertEqual(
  944. Subscribe(200, 600)
  945. );
  946. #if !WINDOWS
  947. scheduler.Timers.AssertEqual(
  948. new TimerRun(200, 600) { 300, 400, 500 }
  949. );
  950. #endif
  951. }
  952. [TestMethod]
  953. public void BufferWithTime_Basic_Same_Periodic_Error()
  954. {
  955. var ex = new Exception();
  956. var scheduler = new PeriodicTestScheduler();
  957. var xs = scheduler.CreateHotObservable(
  958. OnNext(100, 1),
  959. OnNext(210, 2),
  960. OnNext(240, 3),
  961. OnNext(280, 4),
  962. OnNext(320, 5),
  963. OnNext(350, 6),
  964. OnNext(380, 7),
  965. OnNext(420, 8),
  966. OnNext(470, 9),
  967. OnError<int>(480, ex)
  968. );
  969. var res = scheduler.Start(() =>
  970. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  971. );
  972. res.Messages.AssertEqual(
  973. OnNext(300, "2,3,4"),
  974. OnNext(400, "5,6,7"),
  975. OnError<string>(480, ex)
  976. );
  977. xs.Subscriptions.AssertEqual(
  978. Subscribe(200, 480)
  979. );
  980. #if !WINDOWS
  981. scheduler.Timers.AssertEqual(
  982. new TimerRun(200, 480) { 300, 400 }
  983. );
  984. #endif
  985. }
  986. [TestMethod]
  987. public void BufferWithTime_Default()
  988. {
  989. Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
  990. Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
  991. }
  992. [TestMethod]
  993. public void BufferWithTimeOrCount_ArgumentChecking()
  994. {
  995. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance));
  996. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance));
  997. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance));
  998. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 1, null));
  999. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1));
  1000. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1));
  1001. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0));
  1002. }
  1003. [TestMethod]
  1004. public void BufferWithTimeOrCount_Basic()
  1005. {
  1006. var scheduler = new TestScheduler();
  1007. var xs = scheduler.CreateHotObservable(
  1008. OnNext(205, 1),
  1009. OnNext(210, 2),
  1010. OnNext(240, 3),
  1011. OnNext(280, 4),
  1012. OnNext(320, 5),
  1013. OnNext(350, 6),
  1014. OnNext(370, 7),
  1015. OnNext(420, 8),
  1016. OnNext(470, 9),
  1017. OnCompleted<int>(600)
  1018. );
  1019. var res = scheduler.Start(() =>
  1020. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  1021. );
  1022. res.Messages.AssertEqual(
  1023. OnNext(240, "1,2,3"),
  1024. OnNext(310, "4"),
  1025. OnNext(370, "5,6,7"),
  1026. OnNext(440, "8"),
  1027. OnNext(510, "9"),
  1028. OnNext(580, ""),
  1029. OnNext(600, ""),
  1030. OnCompleted<string>(600)
  1031. );
  1032. xs.Subscriptions.AssertEqual(
  1033. Subscribe(200, 600)
  1034. );
  1035. }
  1036. [TestMethod]
  1037. public void BufferWithTimeOrCount_Error()
  1038. {
  1039. var scheduler = new TestScheduler();
  1040. var ex = new Exception();
  1041. var xs = scheduler.CreateHotObservable(
  1042. OnNext(205, 1),
  1043. OnNext(210, 2),
  1044. OnNext(240, 3),
  1045. OnNext(280, 4),
  1046. OnNext(320, 5),
  1047. OnNext(350, 6),
  1048. OnNext(370, 7),
  1049. OnNext(420, 8),
  1050. OnNext(470, 9),
  1051. OnError<int>(600, ex)
  1052. );
  1053. var res = scheduler.Start(() =>
  1054. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  1055. );
  1056. res.Messages.AssertEqual(
  1057. OnNext(240, "1,2,3"),
  1058. OnNext(310, "4"),
  1059. OnNext(370, "5,6,7"),
  1060. OnNext(440, "8"),
  1061. OnNext(510, "9"),
  1062. OnNext(580, ""),
  1063. OnError<string>(600, ex)
  1064. );
  1065. xs.Subscriptions.AssertEqual(
  1066. Subscribe(200, 600)
  1067. );
  1068. }
  1069. [TestMethod]
  1070. public void BufferWithTimeOrCount_Disposed()
  1071. {
  1072. var scheduler = new TestScheduler();
  1073. var xs = scheduler.CreateHotObservable(
  1074. OnNext(205, 1),
  1075. OnNext(210, 2),
  1076. OnNext(240, 3),
  1077. OnNext(280, 4),
  1078. OnNext(320, 5),
  1079. OnNext(350, 6),
  1080. OnNext(370, 7),
  1081. OnNext(420, 8),
  1082. OnNext(470, 9),
  1083. OnCompleted<int>(600)
  1084. );
  1085. var res = scheduler.Start(() =>
  1086. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
  1087. 370
  1088. );
  1089. res.Messages.AssertEqual(
  1090. OnNext(240, "1,2,3"),
  1091. OnNext(310, "4"),
  1092. OnNext(370, "5,6,7")
  1093. );
  1094. xs.Subscriptions.AssertEqual(
  1095. Subscribe(200, 370)
  1096. );
  1097. }
  1098. [TestMethod]
  1099. public void BufferWithTimeOrCount_Default()
  1100. {
  1101. Observable.Range(1, 10, DefaultScheduler.Instance).Buffer(TimeSpan.FromDays(1), 3).Skip(1).First().AssertEqual(4, 5, 6);
  1102. }
  1103. [TestMethod]
  1104. public void BufferWithTime_TickWhileOnCompleted()
  1105. {
  1106. var scheduler = new TestScheduler();
  1107. Observable.Return(1)
  1108. .Buffer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(2), scheduler)
  1109. .Subscribe(v =>
  1110. {
  1111. scheduler.AdvanceBy(TimeSpan.FromMilliseconds(1).Ticks);
  1112. });
  1113. }
  1114. #endregion
  1115. }
  1116. }