BufferTest.cs 43 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Xunit;
  12. namespace ReactiveTests.Tests
  13. {
  14. public class BufferTest : ReactiveTest
  15. {
  16. #region + Boundary +
  17. [Fact]
  18. public void Buffer_ArgumentChecking()
  19. {
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyFunc<IObservable<int>>.Instance));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(Func<IObservable<int>>)));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default, DummyFunc<int, IObservable<int>>.Instance));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>)));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(IObservable<int>)));
  27. }
  28. [Fact]
  29. public void Buffer_Closings_Basic()
  30. {
  31. var scheduler = new TestScheduler();
  32. var xs = scheduler.CreateHotObservable(
  33. OnNext(90, 1),
  34. OnNext(180, 2),
  35. OnNext(250, 3),
  36. OnNext(260, 4),
  37. OnNext(310, 5),
  38. OnNext(340, 6),
  39. OnNext(410, 7),
  40. OnNext(420, 8),
  41. OnNext(470, 9),
  42. OnNext(550, 10),
  43. OnCompleted<int>(590)
  44. );
  45. var window = 1;
  46. var res = scheduler.Start(() =>
  47. xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))
  48. );
  49. res.Messages.AssertEqual(
  50. OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),
  51. OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
  52. OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
  53. OnCompleted<IList<int>>(590)
  54. );
  55. xs.Subscriptions.AssertEqual(
  56. Subscribe(200, 590)
  57. );
  58. }
  59. [Fact]
  60. public void Buffer_Closings_InnerSubscriptions()
  61. {
  62. var scheduler = new TestScheduler();
  63. var xs = scheduler.CreateHotObservable(
  64. OnNext(90, 1),
  65. OnNext(180, 2),
  66. OnNext(250, 3),
  67. OnNext(260, 4),
  68. OnNext(310, 5),
  69. OnNext(340, 6),
  70. OnNext(410, 7),
  71. OnNext(420, 8),
  72. OnNext(470, 9),
  73. OnNext(550, 10),
  74. OnCompleted<int>(590)
  75. );
  76. var closings = new ITestableObservable<bool>[] {
  77. scheduler.CreateHotObservable(
  78. OnNext(300, true),
  79. OnNext(350, false),
  80. OnCompleted<bool>(380)
  81. ),
  82. scheduler.CreateHotObservable(
  83. OnNext(400, true),
  84. OnNext(510, false),
  85. OnNext(620, false)
  86. ),
  87. scheduler.CreateHotObservable(
  88. OnCompleted<bool>(500)
  89. ),
  90. scheduler.CreateHotObservable(
  91. OnNext(600, true)
  92. )
  93. };
  94. var window = 0;
  95. var res = scheduler.Start(() =>
  96. xs.Buffer(() => closings[window++])
  97. );
  98. res.Messages.AssertEqual(
  99. OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),
  100. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { 5, 6 })),
  101. OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
  102. OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
  103. OnCompleted<IList<int>>(590)
  104. );
  105. xs.Subscriptions.AssertEqual(
  106. Subscribe(200, 590)
  107. );
  108. closings[0].Subscriptions.AssertEqual(
  109. Subscribe(200, 300)
  110. );
  111. closings[1].Subscriptions.AssertEqual(
  112. Subscribe(300, 400)
  113. );
  114. closings[2].Subscriptions.AssertEqual(
  115. Subscribe(400, 500)
  116. );
  117. closings[3].Subscriptions.AssertEqual(
  118. Subscribe(500, 590)
  119. );
  120. }
  121. [Fact]
  122. public void Buffer_Closings_Empty()
  123. {
  124. var scheduler = new TestScheduler();
  125. var xs = scheduler.CreateHotObservable(
  126. OnNext(90, 1),
  127. OnNext(180, 2),
  128. OnNext(250, 3),
  129. OnNext(260, 4),
  130. OnNext(310, 5),
  131. OnNext(340, 6),
  132. OnNext(410, 7),
  133. OnNext(420, 8),
  134. OnNext(470, 9),
  135. OnNext(550, 10),
  136. OnCompleted<int>(590)
  137. );
  138. var window = 1;
  139. var res = scheduler.Start(() =>
  140. xs.Buffer(() => Observable.Empty<int>().Delay(TimeSpan.FromTicks((window++) * 100), scheduler))
  141. );
  142. res.Messages.AssertEqual(
  143. OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),
  144. OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
  145. OnNext<IList<int>>(590, l => l.SequenceEqual(new int[] { 10 })),
  146. OnCompleted<IList<int>>(590)
  147. );
  148. xs.Subscriptions.AssertEqual(
  149. Subscribe(200, 590)
  150. );
  151. }
  152. [Fact]
  153. public void Buffer_Closings_Dispose()
  154. {
  155. var scheduler = new TestScheduler();
  156. var xs = scheduler.CreateHotObservable(
  157. OnNext(90, 1),
  158. OnNext(180, 2),
  159. OnNext(250, 3),
  160. OnNext(260, 4),
  161. OnNext(310, 5),
  162. OnNext(340, 6),
  163. OnNext(410, 7),
  164. OnNext(420, 8),
  165. OnNext(470, 9),
  166. OnNext(550, 10),
  167. OnCompleted<int>(590)
  168. );
  169. var window = 1;
  170. var res = scheduler.Start(() =>
  171. xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)),
  172. 400
  173. );
  174. res.Messages.AssertEqual(
  175. OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 }))
  176. );
  177. xs.Subscriptions.AssertEqual(
  178. Subscribe(200, 400)
  179. );
  180. }
  181. [Fact]
  182. public void Buffer_Closings_Error()
  183. {
  184. var scheduler = new TestScheduler();
  185. var ex = new Exception();
  186. var xs = scheduler.CreateHotObservable(
  187. OnNext(90, 1),
  188. OnNext(180, 2),
  189. OnNext(250, 3),
  190. OnNext(260, 4),
  191. OnNext(310, 5),
  192. OnNext(340, 6),
  193. OnNext(410, 7),
  194. OnNext(420, 8),
  195. OnNext(470, 9),
  196. OnNext(550, 10),
  197. OnError<int>(590, ex)
  198. );
  199. var window = 1;
  200. var res = scheduler.Start(() =>
  201. xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))
  202. );
  203. res.Messages.AssertEqual(
  204. OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),
  205. OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
  206. OnError<IList<int>>(590, ex)
  207. );
  208. xs.Subscriptions.AssertEqual(
  209. Subscribe(200, 590)
  210. );
  211. }
  212. [Fact]
  213. public void Buffer_Closings_Throw()
  214. {
  215. var scheduler = new TestScheduler();
  216. var ex = new Exception();
  217. var xs = scheduler.CreateHotObservable(
  218. OnNext(90, 1),
  219. OnNext(180, 2),
  220. OnNext(250, 3),
  221. OnNext(260, 4),
  222. OnNext(310, 5),
  223. OnNext(340, 6),
  224. OnNext(410, 7),
  225. OnNext(420, 8),
  226. OnNext(470, 9),
  227. OnNext(550, 10),
  228. OnError<int>(590, new Exception())
  229. );
  230. var res = scheduler.Start(() =>
  231. xs.Buffer<int, int>(() => { throw ex; })
  232. );
  233. res.Messages.AssertEqual(
  234. OnError<IList<int>>(200, ex)
  235. );
  236. xs.Subscriptions.AssertEqual(
  237. Subscribe(200, 200)
  238. );
  239. }
  240. [Fact]
  241. public void Buffer_Closings_WindowClose_Error()
  242. {
  243. var scheduler = new TestScheduler();
  244. var ex = new Exception();
  245. var xs = scheduler.CreateHotObservable(
  246. OnNext(90, 1),
  247. OnNext(180, 2),
  248. OnNext(250, 3),
  249. OnNext(260, 4),
  250. OnNext(310, 5),
  251. OnNext(340, 6),
  252. OnNext(410, 7),
  253. OnNext(420, 8),
  254. OnNext(470, 9),
  255. OnNext(550, 10),
  256. OnError<int>(590, new Exception())
  257. );
  258. var res = scheduler.Start(() =>
  259. xs.Buffer(() => Observable.Throw<int>(ex, scheduler))
  260. );
  261. res.Messages.AssertEqual(
  262. OnError<IList<int>>(201, ex)
  263. );
  264. xs.Subscriptions.AssertEqual(
  265. Subscribe(200, 201)
  266. );
  267. }
  268. [Fact]
  269. public void Buffer_OpeningClosings_Basic()
  270. {
  271. var scheduler = new TestScheduler();
  272. var xs = scheduler.CreateHotObservable(
  273. OnNext(90, 1),
  274. OnNext(180, 2),
  275. OnNext(250, 3),
  276. OnNext(260, 4),
  277. OnNext(310, 5),
  278. OnNext(340, 6),
  279. OnNext(410, 7),
  280. OnNext(420, 8),
  281. OnNext(470, 9),
  282. OnNext(550, 10),
  283. OnCompleted<int>(590)
  284. );
  285. var ys = scheduler.CreateHotObservable(
  286. OnNext(255, 50),
  287. OnNext(330, 100),
  288. OnNext(350, 50),
  289. OnNext(400, 90),
  290. OnCompleted<int>(900)
  291. );
  292. var res = scheduler.Start(() =>
  293. xs.Buffer(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler))
  294. );
  295. res.Messages.AssertEqual(
  296. OnNext<IList<int>>(305, b => b.SequenceEqual(new int[] { 4 })),
  297. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
  298. OnNext<IList<int>>(430, b => b.SequenceEqual(new int[] { 6, 7, 8 })),
  299. OnNext<IList<int>>(490, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
  300. OnCompleted<IList<int>>(900)
  301. );
  302. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  303. xs.Subscriptions.AssertEqual(
  304. Subscribe(200, 590)
  305. );
  306. #else
  307. xs.Subscriptions.AssertEqual(
  308. Subscribe(200, 900)
  309. );
  310. #endif
  311. ys.Subscriptions.AssertEqual(
  312. Subscribe(200, 900)
  313. );
  314. }
  315. [Fact]
  316. public void Buffer_Boundaries_Simple()
  317. {
  318. var scheduler = new TestScheduler();
  319. var xs = scheduler.CreateHotObservable(
  320. OnNext(90, 1),
  321. OnNext(180, 2),
  322. OnNext(250, 3),
  323. OnNext(260, 4),
  324. OnNext(310, 5),
  325. OnNext(340, 6),
  326. OnNext(410, 7),
  327. OnNext(420, 8),
  328. OnNext(470, 9),
  329. OnNext(550, 10),
  330. OnCompleted<int>(590)
  331. );
  332. var ys = scheduler.CreateHotObservable(
  333. OnNext(255, true),
  334. OnNext(330, true),
  335. OnNext(350, true),
  336. OnNext(400, true),
  337. OnNext(500, true),
  338. OnCompleted<bool>(900)
  339. );
  340. var res = scheduler.Start(() =>
  341. xs.Buffer(ys)
  342. );
  343. res.Messages.AssertEqual(
  344. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  345. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  346. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  347. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
  348. OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
  349. OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
  350. OnCompleted<IList<int>>(590)
  351. );
  352. xs.Subscriptions.AssertEqual(
  353. Subscribe(200, 590)
  354. );
  355. ys.Subscriptions.AssertEqual(
  356. Subscribe(200, 590)
  357. );
  358. }
  359. [Fact]
  360. public void Buffer_Boundaries_OnCompletedBoundaries()
  361. {
  362. var scheduler = new TestScheduler();
  363. var xs = scheduler.CreateHotObservable(
  364. OnNext(90, 1),
  365. OnNext(180, 2),
  366. OnNext(250, 3),
  367. OnNext(260, 4),
  368. OnNext(310, 5),
  369. OnNext(340, 6),
  370. OnNext(410, 7),
  371. OnNext(420, 8),
  372. OnNext(470, 9),
  373. OnNext(550, 10),
  374. OnCompleted<int>(590)
  375. );
  376. var ys = scheduler.CreateHotObservable(
  377. OnNext(255, true),
  378. OnNext(330, true),
  379. OnNext(350, true),
  380. OnCompleted<bool>(400)
  381. );
  382. var res = scheduler.Start(() =>
  383. xs.Buffer(ys)
  384. );
  385. res.Messages.AssertEqual(
  386. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  387. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  388. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  389. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
  390. OnCompleted<IList<int>>(400)
  391. );
  392. xs.Subscriptions.AssertEqual(
  393. Subscribe(200, 400)
  394. );
  395. ys.Subscriptions.AssertEqual(
  396. Subscribe(200, 400)
  397. );
  398. }
  399. [Fact]
  400. public void Buffer_Boundaries_OnErrorSource()
  401. {
  402. var ex = new Exception();
  403. var scheduler = new TestScheduler();
  404. var xs = scheduler.CreateHotObservable(
  405. OnNext(90, 1),
  406. OnNext(180, 2),
  407. OnNext(250, 3),
  408. OnNext(260, 4),
  409. OnNext(310, 5),
  410. OnNext(340, 6),
  411. OnNext(380, 7),
  412. OnError<int>(400, ex)
  413. );
  414. var ys = scheduler.CreateHotObservable(
  415. OnNext(255, true),
  416. OnNext(330, true),
  417. OnNext(350, true),
  418. OnCompleted<bool>(500)
  419. );
  420. var res = scheduler.Start(() =>
  421. xs.Buffer(ys)
  422. );
  423. res.Messages.AssertEqual(
  424. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  425. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  426. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  427. OnError<IList<int>>(400, ex)
  428. );
  429. xs.Subscriptions.AssertEqual(
  430. Subscribe(200, 400)
  431. );
  432. ys.Subscriptions.AssertEqual(
  433. Subscribe(200, 400)
  434. );
  435. }
  436. [Fact]
  437. public void Buffer_Boundaries_OnErrorBoundaries()
  438. {
  439. var ex = new Exception();
  440. var scheduler = new TestScheduler();
  441. var xs = scheduler.CreateHotObservable(
  442. OnNext(90, 1),
  443. OnNext(180, 2),
  444. OnNext(250, 3),
  445. OnNext(260, 4),
  446. OnNext(310, 5),
  447. OnNext(340, 6),
  448. OnNext(410, 7),
  449. OnNext(420, 8),
  450. OnNext(470, 9),
  451. OnNext(550, 10),
  452. OnCompleted<int>(590)
  453. );
  454. var ys = scheduler.CreateHotObservable(
  455. OnNext(255, true),
  456. OnNext(330, true),
  457. OnNext(350, true),
  458. OnError<bool>(400, ex)
  459. );
  460. var res = scheduler.Start(() =>
  461. xs.Buffer(ys)
  462. );
  463. res.Messages.AssertEqual(
  464. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  465. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  466. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  467. OnError<IList<int>>(400, ex)
  468. );
  469. xs.Subscriptions.AssertEqual(
  470. Subscribe(200, 400)
  471. );
  472. ys.Subscriptions.AssertEqual(
  473. Subscribe(200, 400)
  474. );
  475. }
  476. #endregion
  477. #region + Count +
  478. [Fact]
  479. public void Buffer_Single_ArgumentChecking()
  480. {
  481. var someObservable = Observable.Empty<int>();
  482. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  483. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0));
  484. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, -1));
  485. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  486. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 1, 0));
  487. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0, 1));
  488. }
  489. [Fact]
  490. public void Buffer_Count_PartialWindow()
  491. {
  492. var scheduler = new TestScheduler();
  493. var xs = scheduler.CreateHotObservable(
  494. OnNext(150, 1),
  495. OnNext(210, 2),
  496. OnNext(220, 3),
  497. OnNext(230, 4),
  498. OnNext(240, 5),
  499. OnCompleted<int>(250)
  500. );
  501. var res = scheduler.Start(() =>
  502. xs.Buffer(5)
  503. );
  504. res.Messages.AssertEqual(
  505. OnNext<IList<int>>(250, l => l.SequenceEqual(new[] { 2, 3, 4, 5 })),
  506. OnCompleted<IList<int>>(250)
  507. );
  508. xs.Subscriptions.AssertEqual(
  509. Subscribe(200, 250)
  510. );
  511. }
  512. [Fact]
  513. public void Buffer_Count_FullWindows()
  514. {
  515. var scheduler = new TestScheduler();
  516. var xs = scheduler.CreateHotObservable(
  517. OnNext(150, 1),
  518. OnNext(210, 2),
  519. OnNext(220, 3),
  520. OnNext(230, 4),
  521. OnNext(240, 5),
  522. OnCompleted<int>(250)
  523. );
  524. var res = scheduler.Start(() =>
  525. xs.Buffer(2)
  526. );
  527. res.Messages.AssertEqual(
  528. OnNext<IList<int>>(220, l => l.SequenceEqual(new[] { 2, 3 })),
  529. OnNext<IList<int>>(240, l => l.SequenceEqual(new[] { 4, 5 })),
  530. OnCompleted<IList<int>>(250)
  531. );
  532. xs.Subscriptions.AssertEqual(
  533. Subscribe(200, 250)
  534. );
  535. }
  536. [Fact]
  537. public void Buffer_Count_FullAndPartialWindows()
  538. {
  539. var scheduler = new TestScheduler();
  540. var xs = scheduler.CreateHotObservable(
  541. OnNext(150, 1),
  542. OnNext(210, 2),
  543. OnNext(220, 3),
  544. OnNext(230, 4),
  545. OnNext(240, 5),
  546. OnCompleted<int>(250)
  547. );
  548. var res = scheduler.Start(() =>
  549. xs.Buffer(3)
  550. );
  551. res.Messages.AssertEqual(
  552. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  553. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  554. OnCompleted<IList<int>>(250)
  555. );
  556. xs.Subscriptions.AssertEqual(
  557. Subscribe(200, 250)
  558. );
  559. }
  560. [Fact]
  561. public void Buffer_Count_Error()
  562. {
  563. var scheduler = new TestScheduler();
  564. var ex = new Exception();
  565. var xs = scheduler.CreateHotObservable(
  566. OnNext(150, 1),
  567. OnNext(210, 2),
  568. OnNext(220, 3),
  569. OnNext(230, 4),
  570. OnNext(240, 5),
  571. OnError<int>(250, ex)
  572. );
  573. var res = scheduler.Start(() =>
  574. xs.Buffer(5)
  575. );
  576. res.Messages.AssertEqual(
  577. OnError<IList<int>>(250, ex)
  578. );
  579. xs.Subscriptions.AssertEqual(
  580. Subscribe(200, 250)
  581. );
  582. }
  583. [Fact]
  584. public void Buffer_Count_Skip_Less()
  585. {
  586. var scheduler = new TestScheduler();
  587. var xs = scheduler.CreateHotObservable(
  588. OnNext(150, 1),
  589. OnNext(210, 2),
  590. OnNext(220, 3),
  591. OnNext(230, 4),
  592. OnNext(240, 5),
  593. OnCompleted<int>(250)
  594. );
  595. var res = scheduler.Start(() =>
  596. xs.Buffer(3, 1)
  597. );
  598. res.Messages.AssertEqual(
  599. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  600. OnNext<IList<int>>(240, l => l.SequenceEqual(new int[] { 3, 4, 5 })),
  601. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 4, 5 })),
  602. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  603. OnCompleted<IList<int>>(250)
  604. );
  605. xs.Subscriptions.AssertEqual(
  606. Subscribe(200, 250)
  607. );
  608. }
  609. [Fact]
  610. public void Buffer_Count_Skip_More()
  611. {
  612. var scheduler = new TestScheduler();
  613. var xs = scheduler.CreateHotObservable(
  614. OnNext(150, 1),
  615. OnNext(210, 2),
  616. OnNext(220, 3),
  617. OnNext(230, 4),
  618. OnNext(240, 5),
  619. OnCompleted<int>(250)
  620. );
  621. var res = scheduler.Start(() =>
  622. xs.Buffer(2, 3)
  623. );
  624. res.Messages.AssertEqual(
  625. OnNext<IList<int>>(220, l => l.SequenceEqual(new int[] { 2, 3 })),
  626. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  627. OnCompleted<IList<int>>(250)
  628. );
  629. xs.Subscriptions.AssertEqual(
  630. Subscribe(200, 250)
  631. );
  632. }
  633. [Fact]
  634. public void BufferWithCount_ArgumentChecking()
  635. {
  636. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  637. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0, 1));
  638. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 1, 0));
  639. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  640. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0));
  641. }
  642. [Fact]
  643. public void BufferWithCount_Basic()
  644. {
  645. var scheduler = new TestScheduler();
  646. var xs = scheduler.CreateHotObservable(
  647. OnNext(100, 1),
  648. OnNext(210, 2),
  649. OnNext(240, 3),
  650. OnNext(280, 4),
  651. OnNext(320, 5),
  652. OnNext(350, 6),
  653. OnNext(380, 7),
  654. OnNext(420, 8),
  655. OnNext(470, 9),
  656. OnCompleted<int>(600)
  657. );
  658. var res = scheduler.Start(() =>
  659. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  660. );
  661. res.Messages.AssertEqual(
  662. OnNext(280, "2,3,4"),
  663. OnNext(350, "4,5,6"),
  664. OnNext(420, "6,7,8"),
  665. OnNext(600, "8,9"),
  666. OnCompleted<string>(600)
  667. );
  668. xs.Subscriptions.AssertEqual(
  669. Subscribe(200, 600)
  670. );
  671. }
  672. [Fact]
  673. public void BufferWithCount_Disposed()
  674. {
  675. var scheduler = new TestScheduler();
  676. var xs = scheduler.CreateHotObservable(
  677. OnNext(100, 1),
  678. OnNext(210, 2),
  679. OnNext(240, 3),
  680. OnNext(280, 4),
  681. OnNext(320, 5),
  682. OnNext(350, 6),
  683. OnNext(380, 7),
  684. OnNext(420, 8),
  685. OnNext(470, 9),
  686. OnCompleted<int>(600)
  687. );
  688. var res = scheduler.Start(() =>
  689. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370
  690. );
  691. res.Messages.AssertEqual(
  692. OnNext(280, "2,3,4"),
  693. OnNext(350, "4,5,6")
  694. );
  695. xs.Subscriptions.AssertEqual(
  696. Subscribe(200, 370)
  697. );
  698. }
  699. [Fact]
  700. public void BufferWithCount_Error()
  701. {
  702. var scheduler = new TestScheduler();
  703. var ex = new Exception();
  704. var xs = scheduler.CreateHotObservable(
  705. OnNext(100, 1),
  706. OnNext(210, 2),
  707. OnNext(240, 3),
  708. OnNext(280, 4),
  709. OnNext(320, 5),
  710. OnNext(350, 6),
  711. OnNext(380, 7),
  712. OnNext(420, 8),
  713. OnNext(470, 9),
  714. OnError<int>(600, ex)
  715. );
  716. var res = scheduler.Start(() =>
  717. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  718. );
  719. res.Messages.AssertEqual(
  720. OnNext(280, "2,3,4"),
  721. OnNext(350, "4,5,6"),
  722. OnNext(420, "6,7,8"),
  723. OnError<string>(600, ex)
  724. );
  725. xs.Subscriptions.AssertEqual(
  726. Subscribe(200, 600)
  727. );
  728. }
  729. [Fact]
  730. public void BufferWithCount_Default()
  731. {
  732. Observable.Range(1, 10).Buffer(3).Skip(1).First().AssertEqual(4, 5, 6);
  733. Observable.Range(1, 10).Buffer(3, 2).Skip(1).First().AssertEqual(3, 4, 5);
  734. }
  735. #endregion
  736. #region + Time +
  737. [Fact]
  738. public void Buffer_Time_ArgumentChecking()
  739. {
  740. var scheduler = new TestScheduler();
  741. var someObservable = Observable.Empty<int>();
  742. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero));
  743. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, null));
  744. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, scheduler));
  745. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero));
  746. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, TimeSpan.Zero, null));
  747. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero, scheduler));
  748. }
  749. [Fact]
  750. public void BufferWithTime_ArgumentChecking()
  751. {
  752. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  753. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  754. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  755. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null));
  756. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1)));
  757. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1)));
  758. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1)));
  759. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  760. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  761. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), null));
  762. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1)));
  763. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1)));
  764. }
  765. [Fact]
  766. public void BufferWithTime_Basic1()
  767. {
  768. var scheduler = new TestScheduler();
  769. var xs = scheduler.CreateHotObservable(
  770. OnNext(100, 1),
  771. OnNext(210, 2),
  772. OnNext(240, 3),
  773. OnNext(280, 4),
  774. OnNext(320, 5),
  775. OnNext(350, 6),
  776. OnNext(380, 7),
  777. OnNext(420, 8),
  778. OnNext(470, 9),
  779. OnCompleted<int>(600)
  780. );
  781. var res = scheduler.Start(() =>
  782. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  783. );
  784. res.Messages.AssertEqual(
  785. OnNext(300, "2,3,4"),
  786. OnNext(370, "4,5,6"),
  787. OnNext(440, "6,7,8"),
  788. OnNext(510, "8,9"),
  789. OnNext(580, ""),
  790. OnNext(600, ""),
  791. OnCompleted<string>(600)
  792. );
  793. xs.Subscriptions.AssertEqual(
  794. Subscribe(200, 600)
  795. );
  796. }
  797. [Fact]
  798. public void BufferWithTime_Basic2()
  799. {
  800. var scheduler = new TestScheduler();
  801. var xs = scheduler.CreateHotObservable(
  802. OnNext(100, 1),
  803. OnNext(210, 2),
  804. OnNext(240, 3),
  805. OnNext(280, 4),
  806. OnNext(320, 5),
  807. OnNext(350, 6),
  808. OnNext(380, 7),
  809. OnNext(420, 8),
  810. OnNext(470, 9),
  811. OnCompleted<int>(600)
  812. );
  813. var res = scheduler.Start(() =>
  814. xs.Buffer(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  815. );
  816. res.Messages.AssertEqual(
  817. OnNext(270, "2,3"),
  818. OnNext(370, "5,6"),
  819. OnNext(470, "8,9"),
  820. OnNext(570, ""),
  821. OnCompleted<string>(600)
  822. );
  823. xs.Subscriptions.AssertEqual(
  824. Subscribe(200, 600)
  825. );
  826. }
  827. [Fact]
  828. public void BufferWithTime_Error()
  829. {
  830. var scheduler = new TestScheduler();
  831. var ex = new Exception();
  832. var xs = scheduler.CreateHotObservable(
  833. OnNext(100, 1),
  834. OnNext(210, 2),
  835. OnNext(240, 3),
  836. OnNext(280, 4),
  837. OnNext(320, 5),
  838. OnNext(350, 6),
  839. OnNext(380, 7),
  840. OnNext(420, 8),
  841. OnNext(470, 9),
  842. OnError<int>(600, ex)
  843. );
  844. var res = scheduler.Start(() =>
  845. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  846. );
  847. res.Messages.AssertEqual(
  848. OnNext(300, "2,3,4"),
  849. OnNext(370, "4,5,6"),
  850. OnNext(440, "6,7,8"),
  851. OnNext(510, "8,9"),
  852. OnNext(580, ""),
  853. OnError<string>(600, ex)
  854. );
  855. xs.Subscriptions.AssertEqual(
  856. Subscribe(200, 600)
  857. );
  858. }
  859. [Fact]
  860. public void BufferWithTime_Disposed()
  861. {
  862. var scheduler = new TestScheduler();
  863. var xs = scheduler.CreateHotObservable(
  864. OnNext(100, 1),
  865. OnNext(210, 2),
  866. OnNext(240, 3),
  867. OnNext(280, 4),
  868. OnNext(320, 5),
  869. OnNext(350, 6),
  870. OnNext(380, 7),
  871. OnNext(420, 8),
  872. OnNext(470, 9),
  873. OnCompleted<int>(600)
  874. );
  875. var res = scheduler.Start(() =>
  876. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
  877. 370
  878. );
  879. res.Messages.AssertEqual(
  880. OnNext(300, "2,3,4")
  881. );
  882. xs.Subscriptions.AssertEqual(
  883. Subscribe(200, 370)
  884. );
  885. }
  886. [Fact]
  887. public void BufferWithTime_Basic_Same()
  888. {
  889. var scheduler = new TestScheduler();
  890. var xs = scheduler.CreateHotObservable(
  891. OnNext(100, 1),
  892. OnNext(210, 2),
  893. OnNext(240, 3),
  894. OnNext(280, 4),
  895. OnNext(320, 5),
  896. OnNext(350, 6),
  897. OnNext(380, 7),
  898. OnNext(420, 8),
  899. OnNext(470, 9),
  900. OnCompleted<int>(600)
  901. );
  902. var res = scheduler.Start(() =>
  903. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  904. );
  905. res.Messages.AssertEqual(
  906. OnNext(300, "2,3,4"),
  907. OnNext(400, "5,6,7"),
  908. OnNext(500, "8,9"),
  909. OnNext(600, ""),
  910. OnCompleted<string>(600)
  911. );
  912. xs.Subscriptions.AssertEqual(
  913. Subscribe(200, 600)
  914. );
  915. }
  916. [Fact]
  917. public void BufferWithTime_Basic_Same_Periodic()
  918. {
  919. var scheduler = new PeriodicTestScheduler();
  920. var xs = scheduler.CreateHotObservable(
  921. OnNext(100, 1),
  922. OnNext(210, 2),
  923. OnNext(240, 3),
  924. OnNext(280, 4),
  925. OnNext(320, 5),
  926. OnNext(350, 6),
  927. OnNext(380, 7),
  928. OnNext(420, 8),
  929. OnNext(470, 9),
  930. OnCompleted<int>(600)
  931. );
  932. var res = scheduler.Start(() =>
  933. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  934. );
  935. res.Messages.AssertEqual(
  936. OnNext(300, "2,3,4"),
  937. OnNext(400, "5,6,7"),
  938. OnNext(500, "8,9"),
  939. OnNext(600, ""),
  940. OnCompleted<string>(600)
  941. );
  942. xs.Subscriptions.AssertEqual(
  943. Subscribe(200, 600)
  944. );
  945. #if !WINDOWS
  946. scheduler.Timers.AssertEqual(
  947. new TimerRun(200, 600) { 300, 400, 500 }
  948. );
  949. #endif
  950. }
  951. [Fact]
  952. public void BufferWithTime_Basic_Same_Periodic_Error()
  953. {
  954. var ex = new Exception();
  955. var scheduler = new PeriodicTestScheduler();
  956. var xs = scheduler.CreateHotObservable(
  957. OnNext(100, 1),
  958. OnNext(210, 2),
  959. OnNext(240, 3),
  960. OnNext(280, 4),
  961. OnNext(320, 5),
  962. OnNext(350, 6),
  963. OnNext(380, 7),
  964. OnNext(420, 8),
  965. OnNext(470, 9),
  966. OnError<int>(480, ex)
  967. );
  968. var res = scheduler.Start(() =>
  969. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  970. );
  971. res.Messages.AssertEqual(
  972. OnNext(300, "2,3,4"),
  973. OnNext(400, "5,6,7"),
  974. OnError<string>(480, ex)
  975. );
  976. xs.Subscriptions.AssertEqual(
  977. Subscribe(200, 480)
  978. );
  979. #if !WINDOWS
  980. scheduler.Timers.AssertEqual(
  981. new TimerRun(200, 480) { 300, 400 }
  982. );
  983. #endif
  984. }
  985. [Fact]
  986. public void BufferWithTime_Default()
  987. {
  988. Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
  989. Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
  990. }
  991. [Fact]
  992. public void BufferWithTimeOrCount_ArgumentChecking()
  993. {
  994. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance));
  995. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance));
  996. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance));
  997. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 1, null));
  998. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1));
  999. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1));
  1000. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0));
  1001. }
  1002. [Fact]
  1003. public void BufferWithTimeOrCount_Basic()
  1004. {
  1005. var scheduler = new TestScheduler();
  1006. var xs = scheduler.CreateHotObservable(
  1007. OnNext(205, 1),
  1008. OnNext(210, 2),
  1009. OnNext(240, 3),
  1010. OnNext(280, 4),
  1011. OnNext(320, 5),
  1012. OnNext(350, 6),
  1013. OnNext(370, 7),
  1014. OnNext(420, 8),
  1015. OnNext(470, 9),
  1016. OnCompleted<int>(600)
  1017. );
  1018. var res = scheduler.Start(() =>
  1019. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  1020. );
  1021. res.Messages.AssertEqual(
  1022. OnNext(240, "1,2,3"),
  1023. OnNext(310, "4"),
  1024. OnNext(370, "5,6,7"),
  1025. OnNext(440, "8"),
  1026. OnNext(510, "9"),
  1027. OnNext(580, ""),
  1028. OnNext(600, ""),
  1029. OnCompleted<string>(600)
  1030. );
  1031. xs.Subscriptions.AssertEqual(
  1032. Subscribe(200, 600)
  1033. );
  1034. }
  1035. [Fact]
  1036. public void BufferWithTimeOrCount_Error()
  1037. {
  1038. var scheduler = new TestScheduler();
  1039. var ex = new Exception();
  1040. var xs = scheduler.CreateHotObservable(
  1041. OnNext(205, 1),
  1042. OnNext(210, 2),
  1043. OnNext(240, 3),
  1044. OnNext(280, 4),
  1045. OnNext(320, 5),
  1046. OnNext(350, 6),
  1047. OnNext(370, 7),
  1048. OnNext(420, 8),
  1049. OnNext(470, 9),
  1050. OnError<int>(600, ex)
  1051. );
  1052. var res = scheduler.Start(() =>
  1053. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  1054. );
  1055. res.Messages.AssertEqual(
  1056. OnNext(240, "1,2,3"),
  1057. OnNext(310, "4"),
  1058. OnNext(370, "5,6,7"),
  1059. OnNext(440, "8"),
  1060. OnNext(510, "9"),
  1061. OnNext(580, ""),
  1062. OnError<string>(600, ex)
  1063. );
  1064. xs.Subscriptions.AssertEqual(
  1065. Subscribe(200, 600)
  1066. );
  1067. }
  1068. [Fact]
  1069. public void BufferWithTimeOrCount_Disposed()
  1070. {
  1071. var scheduler = new TestScheduler();
  1072. var xs = scheduler.CreateHotObservable(
  1073. OnNext(205, 1),
  1074. OnNext(210, 2),
  1075. OnNext(240, 3),
  1076. OnNext(280, 4),
  1077. OnNext(320, 5),
  1078. OnNext(350, 6),
  1079. OnNext(370, 7),
  1080. OnNext(420, 8),
  1081. OnNext(470, 9),
  1082. OnCompleted<int>(600)
  1083. );
  1084. var res = scheduler.Start(() =>
  1085. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
  1086. 370
  1087. );
  1088. res.Messages.AssertEqual(
  1089. OnNext(240, "1,2,3"),
  1090. OnNext(310, "4"),
  1091. OnNext(370, "5,6,7")
  1092. );
  1093. xs.Subscriptions.AssertEqual(
  1094. Subscribe(200, 370)
  1095. );
  1096. }
  1097. [Fact]
  1098. public void BufferWithTimeOrCount_Default()
  1099. {
  1100. Observable.Range(1, 10, DefaultScheduler.Instance).Buffer(TimeSpan.FromDays(1), 3).Skip(1).First().AssertEqual(4, 5, 6);
  1101. }
  1102. #endregion
  1103. }
  1104. }