BufferTest.cs 43 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class BufferTest : ReactiveTest
  22. {
  23. #region + Boundary +
  24. [Fact]
  25. public void Buffer_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyFunc<IObservable<int>>.Instance));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(Func<IObservable<int>>)));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(IObservable<int>), DummyFunc<int, IObservable<int>>.Instance));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>)));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(IObservable<int>)));
  34. }
  35. [Fact]
  36. public void Buffer_Closings_Basic()
  37. {
  38. var scheduler = new TestScheduler();
  39. var xs = scheduler.CreateHotObservable(
  40. OnNext(90, 1),
  41. OnNext(180, 2),
  42. OnNext(250, 3),
  43. OnNext(260, 4),
  44. OnNext(310, 5),
  45. OnNext(340, 6),
  46. OnNext(410, 7),
  47. OnNext(420, 8),
  48. OnNext(470, 9),
  49. OnNext(550, 10),
  50. OnCompleted<int>(590)
  51. );
  52. var window = 1;
  53. var res = scheduler.Start(() =>
  54. xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))
  55. );
  56. res.Messages.AssertEqual(
  57. OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),
  58. OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
  59. OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
  60. OnCompleted<IList<int>>(590)
  61. );
  62. xs.Subscriptions.AssertEqual(
  63. Subscribe(200, 590)
  64. );
  65. }
  66. [Fact]
  67. public void Buffer_Closings_InnerSubscriptions()
  68. {
  69. var scheduler = new TestScheduler();
  70. var xs = scheduler.CreateHotObservable(
  71. OnNext(90, 1),
  72. OnNext(180, 2),
  73. OnNext(250, 3),
  74. OnNext(260, 4),
  75. OnNext(310, 5),
  76. OnNext(340, 6),
  77. OnNext(410, 7),
  78. OnNext(420, 8),
  79. OnNext(470, 9),
  80. OnNext(550, 10),
  81. OnCompleted<int>(590)
  82. );
  83. var closings = new ITestableObservable<bool>[] {
  84. scheduler.CreateHotObservable(
  85. OnNext(300, true),
  86. OnNext(350, false),
  87. OnCompleted<bool>(380)
  88. ),
  89. scheduler.CreateHotObservable(
  90. OnNext(400, true),
  91. OnNext(510, false),
  92. OnNext(620, false)
  93. ),
  94. scheduler.CreateHotObservable(
  95. OnCompleted<bool>(500)
  96. ),
  97. scheduler.CreateHotObservable(
  98. OnNext(600, true)
  99. )
  100. };
  101. var window = 0;
  102. var res = scheduler.Start(() =>
  103. xs.Buffer(() => closings[window++])
  104. );
  105. res.Messages.AssertEqual(
  106. OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),
  107. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { 5, 6 })),
  108. OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
  109. OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
  110. OnCompleted<IList<int>>(590)
  111. );
  112. xs.Subscriptions.AssertEqual(
  113. Subscribe(200, 590)
  114. );
  115. closings[0].Subscriptions.AssertEqual(
  116. Subscribe(200, 300)
  117. );
  118. closings[1].Subscriptions.AssertEqual(
  119. Subscribe(300, 400)
  120. );
  121. closings[2].Subscriptions.AssertEqual(
  122. Subscribe(400, 500)
  123. );
  124. closings[3].Subscriptions.AssertEqual(
  125. Subscribe(500, 590)
  126. );
  127. }
  128. [Fact]
  129. public void Buffer_Closings_Empty()
  130. {
  131. var scheduler = new TestScheduler();
  132. var xs = scheduler.CreateHotObservable(
  133. OnNext(90, 1),
  134. OnNext(180, 2),
  135. OnNext(250, 3),
  136. OnNext(260, 4),
  137. OnNext(310, 5),
  138. OnNext(340, 6),
  139. OnNext(410, 7),
  140. OnNext(420, 8),
  141. OnNext(470, 9),
  142. OnNext(550, 10),
  143. OnCompleted<int>(590)
  144. );
  145. var window = 1;
  146. var res = scheduler.Start(() =>
  147. xs.Buffer(() => Observable.Empty<int>().Delay(TimeSpan.FromTicks((window++) * 100), scheduler))
  148. );
  149. res.Messages.AssertEqual(
  150. OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),
  151. OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
  152. OnNext<IList<int>>(590, l => l.SequenceEqual(new int[] { 10 })),
  153. OnCompleted<IList<int>>(590)
  154. );
  155. xs.Subscriptions.AssertEqual(
  156. Subscribe(200, 590)
  157. );
  158. }
  159. [Fact]
  160. public void Buffer_Closings_Dispose()
  161. {
  162. var scheduler = new TestScheduler();
  163. var xs = scheduler.CreateHotObservable(
  164. OnNext(90, 1),
  165. OnNext(180, 2),
  166. OnNext(250, 3),
  167. OnNext(260, 4),
  168. OnNext(310, 5),
  169. OnNext(340, 6),
  170. OnNext(410, 7),
  171. OnNext(420, 8),
  172. OnNext(470, 9),
  173. OnNext(550, 10),
  174. OnCompleted<int>(590)
  175. );
  176. var window = 1;
  177. var res = scheduler.Start(() =>
  178. xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)),
  179. 400
  180. );
  181. res.Messages.AssertEqual(
  182. OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 }))
  183. );
  184. xs.Subscriptions.AssertEqual(
  185. Subscribe(200, 400)
  186. );
  187. }
  188. [Fact]
  189. public void Buffer_Closings_Error()
  190. {
  191. var scheduler = new TestScheduler();
  192. var ex = new Exception();
  193. var xs = scheduler.CreateHotObservable(
  194. OnNext(90, 1),
  195. OnNext(180, 2),
  196. OnNext(250, 3),
  197. OnNext(260, 4),
  198. OnNext(310, 5),
  199. OnNext(340, 6),
  200. OnNext(410, 7),
  201. OnNext(420, 8),
  202. OnNext(470, 9),
  203. OnNext(550, 10),
  204. OnError<int>(590, ex)
  205. );
  206. var window = 1;
  207. var res = scheduler.Start(() =>
  208. xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))
  209. );
  210. res.Messages.AssertEqual(
  211. OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),
  212. OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
  213. OnError<IList<int>>(590, ex)
  214. );
  215. xs.Subscriptions.AssertEqual(
  216. Subscribe(200, 590)
  217. );
  218. }
  219. [Fact]
  220. public void Buffer_Closings_Throw()
  221. {
  222. var scheduler = new TestScheduler();
  223. var ex = new Exception();
  224. var xs = scheduler.CreateHotObservable(
  225. OnNext(90, 1),
  226. OnNext(180, 2),
  227. OnNext(250, 3),
  228. OnNext(260, 4),
  229. OnNext(310, 5),
  230. OnNext(340, 6),
  231. OnNext(410, 7),
  232. OnNext(420, 8),
  233. OnNext(470, 9),
  234. OnNext(550, 10),
  235. OnError<int>(590, new Exception())
  236. );
  237. var res = scheduler.Start(() =>
  238. xs.Buffer<int, int>(() => { throw ex; })
  239. );
  240. res.Messages.AssertEqual(
  241. OnError<IList<int>>(200, ex)
  242. );
  243. xs.Subscriptions.AssertEqual(
  244. Subscribe(200, 200)
  245. );
  246. }
  247. [Fact]
  248. public void Buffer_Closings_WindowClose_Error()
  249. {
  250. var scheduler = new TestScheduler();
  251. var ex = new Exception();
  252. var xs = scheduler.CreateHotObservable(
  253. OnNext(90, 1),
  254. OnNext(180, 2),
  255. OnNext(250, 3),
  256. OnNext(260, 4),
  257. OnNext(310, 5),
  258. OnNext(340, 6),
  259. OnNext(410, 7),
  260. OnNext(420, 8),
  261. OnNext(470, 9),
  262. OnNext(550, 10),
  263. OnError<int>(590, new Exception())
  264. );
  265. var res = scheduler.Start(() =>
  266. xs.Buffer(() => Observable.Throw<int>(ex, scheduler))
  267. );
  268. res.Messages.AssertEqual(
  269. OnError<IList<int>>(201, ex)
  270. );
  271. xs.Subscriptions.AssertEqual(
  272. Subscribe(200, 201)
  273. );
  274. }
  275. [Fact]
  276. public void Buffer_OpeningClosings_Basic()
  277. {
  278. var scheduler = new TestScheduler();
  279. var xs = scheduler.CreateHotObservable(
  280. OnNext(90, 1),
  281. OnNext(180, 2),
  282. OnNext(250, 3),
  283. OnNext(260, 4),
  284. OnNext(310, 5),
  285. OnNext(340, 6),
  286. OnNext(410, 7),
  287. OnNext(420, 8),
  288. OnNext(470, 9),
  289. OnNext(550, 10),
  290. OnCompleted<int>(590)
  291. );
  292. var ys = scheduler.CreateHotObservable(
  293. OnNext(255, 50),
  294. OnNext(330, 100),
  295. OnNext(350, 50),
  296. OnNext(400, 90),
  297. OnCompleted<int>(900)
  298. );
  299. var res = scheduler.Start(() =>
  300. xs.Buffer(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler))
  301. );
  302. res.Messages.AssertEqual(
  303. OnNext<IList<int>>(305, b => b.SequenceEqual(new int[] { 4 })),
  304. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
  305. OnNext<IList<int>>(430, b => b.SequenceEqual(new int[] { 6, 7, 8 })),
  306. OnNext<IList<int>>(490, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
  307. OnCompleted<IList<int>>(900)
  308. );
  309. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  310. xs.Subscriptions.AssertEqual(
  311. Subscribe(200, 590)
  312. );
  313. #else
  314. xs.Subscriptions.AssertEqual(
  315. Subscribe(200, 900)
  316. );
  317. #endif
  318. ys.Subscriptions.AssertEqual(
  319. Subscribe(200, 900)
  320. );
  321. }
  322. [Fact]
  323. public void Buffer_Boundaries_Simple()
  324. {
  325. var scheduler = new TestScheduler();
  326. var xs = scheduler.CreateHotObservable(
  327. OnNext(90, 1),
  328. OnNext(180, 2),
  329. OnNext(250, 3),
  330. OnNext(260, 4),
  331. OnNext(310, 5),
  332. OnNext(340, 6),
  333. OnNext(410, 7),
  334. OnNext(420, 8),
  335. OnNext(470, 9),
  336. OnNext(550, 10),
  337. OnCompleted<int>(590)
  338. );
  339. var ys = scheduler.CreateHotObservable(
  340. OnNext(255, true),
  341. OnNext(330, true),
  342. OnNext(350, true),
  343. OnNext(400, true),
  344. OnNext(500, true),
  345. OnCompleted<bool>(900)
  346. );
  347. var res = scheduler.Start(() =>
  348. xs.Buffer(ys)
  349. );
  350. res.Messages.AssertEqual(
  351. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  352. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  353. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  354. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
  355. OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
  356. OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
  357. OnCompleted<IList<int>>(590)
  358. );
  359. xs.Subscriptions.AssertEqual(
  360. Subscribe(200, 590)
  361. );
  362. ys.Subscriptions.AssertEqual(
  363. Subscribe(200, 590)
  364. );
  365. }
  366. [Fact]
  367. public void Buffer_Boundaries_OnCompletedBoundaries()
  368. {
  369. var scheduler = new TestScheduler();
  370. var xs = scheduler.CreateHotObservable(
  371. OnNext(90, 1),
  372. OnNext(180, 2),
  373. OnNext(250, 3),
  374. OnNext(260, 4),
  375. OnNext(310, 5),
  376. OnNext(340, 6),
  377. OnNext(410, 7),
  378. OnNext(420, 8),
  379. OnNext(470, 9),
  380. OnNext(550, 10),
  381. OnCompleted<int>(590)
  382. );
  383. var ys = scheduler.CreateHotObservable(
  384. OnNext(255, true),
  385. OnNext(330, true),
  386. OnNext(350, true),
  387. OnCompleted<bool>(400)
  388. );
  389. var res = scheduler.Start(() =>
  390. xs.Buffer(ys)
  391. );
  392. res.Messages.AssertEqual(
  393. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  394. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  395. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  396. OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
  397. OnCompleted<IList<int>>(400)
  398. );
  399. xs.Subscriptions.AssertEqual(
  400. Subscribe(200, 400)
  401. );
  402. ys.Subscriptions.AssertEqual(
  403. Subscribe(200, 400)
  404. );
  405. }
  406. [Fact]
  407. public void Buffer_Boundaries_OnErrorSource()
  408. {
  409. var ex = new Exception();
  410. var scheduler = new TestScheduler();
  411. var xs = scheduler.CreateHotObservable(
  412. OnNext(90, 1),
  413. OnNext(180, 2),
  414. OnNext(250, 3),
  415. OnNext(260, 4),
  416. OnNext(310, 5),
  417. OnNext(340, 6),
  418. OnNext(380, 7),
  419. OnError<int>(400, ex)
  420. );
  421. var ys = scheduler.CreateHotObservable(
  422. OnNext(255, true),
  423. OnNext(330, true),
  424. OnNext(350, true),
  425. OnCompleted<bool>(500)
  426. );
  427. var res = scheduler.Start(() =>
  428. xs.Buffer(ys)
  429. );
  430. res.Messages.AssertEqual(
  431. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  432. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  433. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  434. OnError<IList<int>>(400, ex)
  435. );
  436. xs.Subscriptions.AssertEqual(
  437. Subscribe(200, 400)
  438. );
  439. ys.Subscriptions.AssertEqual(
  440. Subscribe(200, 400)
  441. );
  442. }
  443. [Fact]
  444. public void Buffer_Boundaries_OnErrorBoundaries()
  445. {
  446. var ex = new Exception();
  447. var scheduler = new TestScheduler();
  448. var xs = scheduler.CreateHotObservable(
  449. OnNext(90, 1),
  450. OnNext(180, 2),
  451. OnNext(250, 3),
  452. OnNext(260, 4),
  453. OnNext(310, 5),
  454. OnNext(340, 6),
  455. OnNext(410, 7),
  456. OnNext(420, 8),
  457. OnNext(470, 9),
  458. OnNext(550, 10),
  459. OnCompleted<int>(590)
  460. );
  461. var ys = scheduler.CreateHotObservable(
  462. OnNext(255, true),
  463. OnNext(330, true),
  464. OnNext(350, true),
  465. OnError<bool>(400, ex)
  466. );
  467. var res = scheduler.Start(() =>
  468. xs.Buffer(ys)
  469. );
  470. res.Messages.AssertEqual(
  471. OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
  472. OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
  473. OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
  474. OnError<IList<int>>(400, ex)
  475. );
  476. xs.Subscriptions.AssertEqual(
  477. Subscribe(200, 400)
  478. );
  479. ys.Subscriptions.AssertEqual(
  480. Subscribe(200, 400)
  481. );
  482. }
  483. #endregion
  484. #region + Count +
  485. [Fact]
  486. public void Buffer_Single_ArgumentChecking()
  487. {
  488. var someObservable = Observable.Empty<int>();
  489. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  490. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0));
  491. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, -1));
  492. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  493. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 1, 0));
  494. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0, 1));
  495. }
  496. [Fact]
  497. public void Buffer_Count_PartialWindow()
  498. {
  499. var scheduler = new TestScheduler();
  500. var xs = scheduler.CreateHotObservable(
  501. OnNext(150, 1),
  502. OnNext(210, 2),
  503. OnNext(220, 3),
  504. OnNext(230, 4),
  505. OnNext(240, 5),
  506. OnCompleted<int>(250)
  507. );
  508. var res = scheduler.Start(() =>
  509. xs.Buffer(5)
  510. );
  511. res.Messages.AssertEqual(
  512. OnNext<IList<int>>(250, l => l.SequenceEqual(new[] { 2, 3, 4, 5 })),
  513. OnCompleted<IList<int>>(250)
  514. );
  515. xs.Subscriptions.AssertEqual(
  516. Subscribe(200, 250)
  517. );
  518. }
  519. [Fact]
  520. public void Buffer_Count_FullWindows()
  521. {
  522. var scheduler = new TestScheduler();
  523. var xs = scheduler.CreateHotObservable(
  524. OnNext(150, 1),
  525. OnNext(210, 2),
  526. OnNext(220, 3),
  527. OnNext(230, 4),
  528. OnNext(240, 5),
  529. OnCompleted<int>(250)
  530. );
  531. var res = scheduler.Start(() =>
  532. xs.Buffer(2)
  533. );
  534. res.Messages.AssertEqual(
  535. OnNext<IList<int>>(220, l => l.SequenceEqual(new[] { 2, 3 })),
  536. OnNext<IList<int>>(240, l => l.SequenceEqual(new[] { 4, 5 })),
  537. OnCompleted<IList<int>>(250)
  538. );
  539. xs.Subscriptions.AssertEqual(
  540. Subscribe(200, 250)
  541. );
  542. }
  543. [Fact]
  544. public void Buffer_Count_FullAndPartialWindows()
  545. {
  546. var scheduler = new TestScheduler();
  547. var xs = scheduler.CreateHotObservable(
  548. OnNext(150, 1),
  549. OnNext(210, 2),
  550. OnNext(220, 3),
  551. OnNext(230, 4),
  552. OnNext(240, 5),
  553. OnCompleted<int>(250)
  554. );
  555. var res = scheduler.Start(() =>
  556. xs.Buffer(3)
  557. );
  558. res.Messages.AssertEqual(
  559. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  560. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  561. OnCompleted<IList<int>>(250)
  562. );
  563. xs.Subscriptions.AssertEqual(
  564. Subscribe(200, 250)
  565. );
  566. }
  567. [Fact]
  568. public void Buffer_Count_Error()
  569. {
  570. var scheduler = new TestScheduler();
  571. var ex = new Exception();
  572. var xs = scheduler.CreateHotObservable(
  573. OnNext(150, 1),
  574. OnNext(210, 2),
  575. OnNext(220, 3),
  576. OnNext(230, 4),
  577. OnNext(240, 5),
  578. OnError<int>(250, ex)
  579. );
  580. var res = scheduler.Start(() =>
  581. xs.Buffer(5)
  582. );
  583. res.Messages.AssertEqual(
  584. OnError<IList<int>>(250, ex)
  585. );
  586. xs.Subscriptions.AssertEqual(
  587. Subscribe(200, 250)
  588. );
  589. }
  590. [Fact]
  591. public void Buffer_Count_Skip_Less()
  592. {
  593. var scheduler = new TestScheduler();
  594. var xs = scheduler.CreateHotObservable(
  595. OnNext(150, 1),
  596. OnNext(210, 2),
  597. OnNext(220, 3),
  598. OnNext(230, 4),
  599. OnNext(240, 5),
  600. OnCompleted<int>(250)
  601. );
  602. var res = scheduler.Start(() =>
  603. xs.Buffer(3, 1)
  604. );
  605. res.Messages.AssertEqual(
  606. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  607. OnNext<IList<int>>(240, l => l.SequenceEqual(new int[] { 3, 4, 5 })),
  608. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 4, 5 })),
  609. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  610. OnCompleted<IList<int>>(250)
  611. );
  612. xs.Subscriptions.AssertEqual(
  613. Subscribe(200, 250)
  614. );
  615. }
  616. [Fact]
  617. public void Buffer_Count_Skip_More()
  618. {
  619. var scheduler = new TestScheduler();
  620. var xs = scheduler.CreateHotObservable(
  621. OnNext(150, 1),
  622. OnNext(210, 2),
  623. OnNext(220, 3),
  624. OnNext(230, 4),
  625. OnNext(240, 5),
  626. OnCompleted<int>(250)
  627. );
  628. var res = scheduler.Start(() =>
  629. xs.Buffer(2, 3)
  630. );
  631. res.Messages.AssertEqual(
  632. OnNext<IList<int>>(220, l => l.SequenceEqual(new int[] { 2, 3 })),
  633. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  634. OnCompleted<IList<int>>(250)
  635. );
  636. xs.Subscriptions.AssertEqual(
  637. Subscribe(200, 250)
  638. );
  639. }
  640. [Fact]
  641. public void BufferWithCount_ArgumentChecking()
  642. {
  643. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  644. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0, 1));
  645. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 1, 0));
  646. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  647. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0));
  648. }
  649. [Fact]
  650. public void BufferWithCount_Basic()
  651. {
  652. var scheduler = new TestScheduler();
  653. var xs = scheduler.CreateHotObservable(
  654. OnNext(100, 1),
  655. OnNext(210, 2),
  656. OnNext(240, 3),
  657. OnNext(280, 4),
  658. OnNext(320, 5),
  659. OnNext(350, 6),
  660. OnNext(380, 7),
  661. OnNext(420, 8),
  662. OnNext(470, 9),
  663. OnCompleted<int>(600)
  664. );
  665. var res = scheduler.Start(() =>
  666. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  667. );
  668. res.Messages.AssertEqual(
  669. OnNext(280, "2,3,4"),
  670. OnNext(350, "4,5,6"),
  671. OnNext(420, "6,7,8"),
  672. OnNext(600, "8,9"),
  673. OnCompleted<string>(600)
  674. );
  675. xs.Subscriptions.AssertEqual(
  676. Subscribe(200, 600)
  677. );
  678. }
  679. [Fact]
  680. public void BufferWithCount_Disposed()
  681. {
  682. var scheduler = new TestScheduler();
  683. var xs = scheduler.CreateHotObservable(
  684. OnNext(100, 1),
  685. OnNext(210, 2),
  686. OnNext(240, 3),
  687. OnNext(280, 4),
  688. OnNext(320, 5),
  689. OnNext(350, 6),
  690. OnNext(380, 7),
  691. OnNext(420, 8),
  692. OnNext(470, 9),
  693. OnCompleted<int>(600)
  694. );
  695. var res = scheduler.Start(() =>
  696. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370
  697. );
  698. res.Messages.AssertEqual(
  699. OnNext(280, "2,3,4"),
  700. OnNext(350, "4,5,6")
  701. );
  702. xs.Subscriptions.AssertEqual(
  703. Subscribe(200, 370)
  704. );
  705. }
  706. [Fact]
  707. public void BufferWithCount_Error()
  708. {
  709. var scheduler = new TestScheduler();
  710. var ex = new Exception();
  711. var xs = scheduler.CreateHotObservable(
  712. OnNext(100, 1),
  713. OnNext(210, 2),
  714. OnNext(240, 3),
  715. OnNext(280, 4),
  716. OnNext(320, 5),
  717. OnNext(350, 6),
  718. OnNext(380, 7),
  719. OnNext(420, 8),
  720. OnNext(470, 9),
  721. OnError<int>(600, ex)
  722. );
  723. var res = scheduler.Start(() =>
  724. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  725. );
  726. res.Messages.AssertEqual(
  727. OnNext(280, "2,3,4"),
  728. OnNext(350, "4,5,6"),
  729. OnNext(420, "6,7,8"),
  730. OnError<string>(600, ex)
  731. );
  732. xs.Subscriptions.AssertEqual(
  733. Subscribe(200, 600)
  734. );
  735. }
  736. [Fact]
  737. public void BufferWithCount_Default()
  738. {
  739. Observable.Range(1, 10).Buffer(3).Skip(1).First().AssertEqual(4, 5, 6);
  740. Observable.Range(1, 10).Buffer(3, 2).Skip(1).First().AssertEqual(3, 4, 5);
  741. }
  742. #endregion
  743. #region + Time +
  744. [Fact]
  745. public void Buffer_Time_ArgumentChecking()
  746. {
  747. var scheduler = new TestScheduler();
  748. var someObservable = Observable.Empty<int>();
  749. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero));
  750. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, null));
  751. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, scheduler));
  752. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero));
  753. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, TimeSpan.Zero, null));
  754. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero, scheduler));
  755. }
  756. [Fact]
  757. public void BufferWithTime_ArgumentChecking()
  758. {
  759. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  760. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  761. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  762. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null));
  763. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1)));
  764. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1)));
  765. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1)));
  766. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  767. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  768. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), null));
  769. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1)));
  770. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1)));
  771. }
  772. [Fact]
  773. public void BufferWithTime_Basic1()
  774. {
  775. var scheduler = new TestScheduler();
  776. var xs = scheduler.CreateHotObservable(
  777. OnNext(100, 1),
  778. OnNext(210, 2),
  779. OnNext(240, 3),
  780. OnNext(280, 4),
  781. OnNext(320, 5),
  782. OnNext(350, 6),
  783. OnNext(380, 7),
  784. OnNext(420, 8),
  785. OnNext(470, 9),
  786. OnCompleted<int>(600)
  787. );
  788. var res = scheduler.Start(() =>
  789. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  790. );
  791. res.Messages.AssertEqual(
  792. OnNext(300, "2,3,4"),
  793. OnNext(370, "4,5,6"),
  794. OnNext(440, "6,7,8"),
  795. OnNext(510, "8,9"),
  796. OnNext(580, ""),
  797. OnNext(600, ""),
  798. OnCompleted<string>(600)
  799. );
  800. xs.Subscriptions.AssertEqual(
  801. Subscribe(200, 600)
  802. );
  803. }
  804. [Fact]
  805. public void BufferWithTime_Basic2()
  806. {
  807. var scheduler = new TestScheduler();
  808. var xs = scheduler.CreateHotObservable(
  809. OnNext(100, 1),
  810. OnNext(210, 2),
  811. OnNext(240, 3),
  812. OnNext(280, 4),
  813. OnNext(320, 5),
  814. OnNext(350, 6),
  815. OnNext(380, 7),
  816. OnNext(420, 8),
  817. OnNext(470, 9),
  818. OnCompleted<int>(600)
  819. );
  820. var res = scheduler.Start(() =>
  821. xs.Buffer(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  822. );
  823. res.Messages.AssertEqual(
  824. OnNext(270, "2,3"),
  825. OnNext(370, "5,6"),
  826. OnNext(470, "8,9"),
  827. OnNext(570, ""),
  828. OnCompleted<string>(600)
  829. );
  830. xs.Subscriptions.AssertEqual(
  831. Subscribe(200, 600)
  832. );
  833. }
  834. [Fact]
  835. public void BufferWithTime_Error()
  836. {
  837. var scheduler = new TestScheduler();
  838. var ex = new Exception();
  839. var xs = scheduler.CreateHotObservable(
  840. OnNext(100, 1),
  841. OnNext(210, 2),
  842. OnNext(240, 3),
  843. OnNext(280, 4),
  844. OnNext(320, 5),
  845. OnNext(350, 6),
  846. OnNext(380, 7),
  847. OnNext(420, 8),
  848. OnNext(470, 9),
  849. OnError<int>(600, ex)
  850. );
  851. var res = scheduler.Start(() =>
  852. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  853. );
  854. res.Messages.AssertEqual(
  855. OnNext(300, "2,3,4"),
  856. OnNext(370, "4,5,6"),
  857. OnNext(440, "6,7,8"),
  858. OnNext(510, "8,9"),
  859. OnNext(580, ""),
  860. OnError<string>(600, ex)
  861. );
  862. xs.Subscriptions.AssertEqual(
  863. Subscribe(200, 600)
  864. );
  865. }
  866. [Fact]
  867. public void BufferWithTime_Disposed()
  868. {
  869. var scheduler = new TestScheduler();
  870. var xs = scheduler.CreateHotObservable(
  871. OnNext(100, 1),
  872. OnNext(210, 2),
  873. OnNext(240, 3),
  874. OnNext(280, 4),
  875. OnNext(320, 5),
  876. OnNext(350, 6),
  877. OnNext(380, 7),
  878. OnNext(420, 8),
  879. OnNext(470, 9),
  880. OnCompleted<int>(600)
  881. );
  882. var res = scheduler.Start(() =>
  883. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
  884. 370
  885. );
  886. res.Messages.AssertEqual(
  887. OnNext(300, "2,3,4")
  888. );
  889. xs.Subscriptions.AssertEqual(
  890. Subscribe(200, 370)
  891. );
  892. }
  893. [Fact]
  894. public void BufferWithTime_Basic_Same()
  895. {
  896. var scheduler = new TestScheduler();
  897. var xs = scheduler.CreateHotObservable(
  898. OnNext(100, 1),
  899. OnNext(210, 2),
  900. OnNext(240, 3),
  901. OnNext(280, 4),
  902. OnNext(320, 5),
  903. OnNext(350, 6),
  904. OnNext(380, 7),
  905. OnNext(420, 8),
  906. OnNext(470, 9),
  907. OnCompleted<int>(600)
  908. );
  909. var res = scheduler.Start(() =>
  910. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  911. );
  912. res.Messages.AssertEqual(
  913. OnNext(300, "2,3,4"),
  914. OnNext(400, "5,6,7"),
  915. OnNext(500, "8,9"),
  916. OnNext(600, ""),
  917. OnCompleted<string>(600)
  918. );
  919. xs.Subscriptions.AssertEqual(
  920. Subscribe(200, 600)
  921. );
  922. }
  923. [Fact]
  924. public void BufferWithTime_Basic_Same_Periodic()
  925. {
  926. var scheduler = new PeriodicTestScheduler();
  927. var xs = scheduler.CreateHotObservable(
  928. OnNext(100, 1),
  929. OnNext(210, 2),
  930. OnNext(240, 3),
  931. OnNext(280, 4),
  932. OnNext(320, 5),
  933. OnNext(350, 6),
  934. OnNext(380, 7),
  935. OnNext(420, 8),
  936. OnNext(470, 9),
  937. OnCompleted<int>(600)
  938. );
  939. var res = scheduler.Start(() =>
  940. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  941. );
  942. res.Messages.AssertEqual(
  943. OnNext(300, "2,3,4"),
  944. OnNext(400, "5,6,7"),
  945. OnNext(500, "8,9"),
  946. OnNext(600, ""),
  947. OnCompleted<string>(600)
  948. );
  949. xs.Subscriptions.AssertEqual(
  950. Subscribe(200, 600)
  951. );
  952. #if !WINDOWS
  953. scheduler.Timers.AssertEqual(
  954. new TimerRun(200, 600) { 300, 400, 500 }
  955. );
  956. #endif
  957. }
  958. [Fact]
  959. public void BufferWithTime_Basic_Same_Periodic_Error()
  960. {
  961. var ex = new Exception();
  962. var scheduler = new PeriodicTestScheduler();
  963. var xs = scheduler.CreateHotObservable(
  964. OnNext(100, 1),
  965. OnNext(210, 2),
  966. OnNext(240, 3),
  967. OnNext(280, 4),
  968. OnNext(320, 5),
  969. OnNext(350, 6),
  970. OnNext(380, 7),
  971. OnNext(420, 8),
  972. OnNext(470, 9),
  973. OnError<int>(480, ex)
  974. );
  975. var res = scheduler.Start(() =>
  976. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  977. );
  978. res.Messages.AssertEqual(
  979. OnNext(300, "2,3,4"),
  980. OnNext(400, "5,6,7"),
  981. OnError<string>(480, ex)
  982. );
  983. xs.Subscriptions.AssertEqual(
  984. Subscribe(200, 480)
  985. );
  986. #if !WINDOWS
  987. scheduler.Timers.AssertEqual(
  988. new TimerRun(200, 480) { 300, 400 }
  989. );
  990. #endif
  991. }
  992. [Fact]
  993. public void BufferWithTime_Default()
  994. {
  995. Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
  996. Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
  997. }
  998. [Fact]
  999. public void BufferWithTimeOrCount_ArgumentChecking()
  1000. {
  1001. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance));
  1002. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance));
  1003. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance));
  1004. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 1, null));
  1005. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1));
  1006. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1));
  1007. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0));
  1008. }
  1009. [Fact]
  1010. public void BufferWithTimeOrCount_Basic()
  1011. {
  1012. var scheduler = new TestScheduler();
  1013. var xs = scheduler.CreateHotObservable(
  1014. OnNext(205, 1),
  1015. OnNext(210, 2),
  1016. OnNext(240, 3),
  1017. OnNext(280, 4),
  1018. OnNext(320, 5),
  1019. OnNext(350, 6),
  1020. OnNext(370, 7),
  1021. OnNext(420, 8),
  1022. OnNext(470, 9),
  1023. OnCompleted<int>(600)
  1024. );
  1025. var res = scheduler.Start(() =>
  1026. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  1027. );
  1028. res.Messages.AssertEqual(
  1029. OnNext(240, "1,2,3"),
  1030. OnNext(310, "4"),
  1031. OnNext(370, "5,6,7"),
  1032. OnNext(440, "8"),
  1033. OnNext(510, "9"),
  1034. OnNext(580, ""),
  1035. OnNext(600, ""),
  1036. OnCompleted<string>(600)
  1037. );
  1038. xs.Subscriptions.AssertEqual(
  1039. Subscribe(200, 600)
  1040. );
  1041. }
  1042. [Fact]
  1043. public void BufferWithTimeOrCount_Error()
  1044. {
  1045. var scheduler = new TestScheduler();
  1046. var ex = new Exception();
  1047. var xs = scheduler.CreateHotObservable(
  1048. OnNext(205, 1),
  1049. OnNext(210, 2),
  1050. OnNext(240, 3),
  1051. OnNext(280, 4),
  1052. OnNext(320, 5),
  1053. OnNext(350, 6),
  1054. OnNext(370, 7),
  1055. OnNext(420, 8),
  1056. OnNext(470, 9),
  1057. OnError<int>(600, ex)
  1058. );
  1059. var res = scheduler.Start(() =>
  1060. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  1061. );
  1062. res.Messages.AssertEqual(
  1063. OnNext(240, "1,2,3"),
  1064. OnNext(310, "4"),
  1065. OnNext(370, "5,6,7"),
  1066. OnNext(440, "8"),
  1067. OnNext(510, "9"),
  1068. OnNext(580, ""),
  1069. OnError<string>(600, ex)
  1070. );
  1071. xs.Subscriptions.AssertEqual(
  1072. Subscribe(200, 600)
  1073. );
  1074. }
  1075. [Fact]
  1076. public void BufferWithTimeOrCount_Disposed()
  1077. {
  1078. var scheduler = new TestScheduler();
  1079. var xs = scheduler.CreateHotObservable(
  1080. OnNext(205, 1),
  1081. OnNext(210, 2),
  1082. OnNext(240, 3),
  1083. OnNext(280, 4),
  1084. OnNext(320, 5),
  1085. OnNext(350, 6),
  1086. OnNext(370, 7),
  1087. OnNext(420, 8),
  1088. OnNext(470, 9),
  1089. OnCompleted<int>(600)
  1090. );
  1091. var res = scheduler.Start(() =>
  1092. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
  1093. 370
  1094. );
  1095. res.Messages.AssertEqual(
  1096. OnNext(240, "1,2,3"),
  1097. OnNext(310, "4"),
  1098. OnNext(370, "5,6,7")
  1099. );
  1100. xs.Subscriptions.AssertEqual(
  1101. Subscribe(200, 370)
  1102. );
  1103. }
  1104. [Fact]
  1105. public void BufferWithTimeOrCount_Default()
  1106. {
  1107. Observable.Range(1, 10, DefaultScheduler.Instance).Buffer(TimeSpan.FromDays(1), 3).Skip(1).First().AssertEqual(4, 5, 6);
  1108. }
  1109. #endregion
  1110. }
  1111. }