TakeLastBufferTest.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using ReactiveTests.Dummies;
  12. using Xunit;
  13. namespace ReactiveTests.Tests
  14. {
  15. public class TakeLastBufferTest : ReactiveTest
  16. {
  17. #region + Count +
  18. [Fact]
  19. public void TakeLastBuffer_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer<int>(null, 0));
  22. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(DummyObservable<int>.Instance, -1));
  23. }
  24. [Fact]
  25. public void TakeLastBuffer_Zero_Completed()
  26. {
  27. var scheduler = new TestScheduler();
  28. var xs = scheduler.CreateHotObservable(
  29. OnNext(180, 1),
  30. OnNext(210, 2),
  31. OnNext(250, 3),
  32. OnNext(270, 4),
  33. OnNext(310, 5),
  34. OnNext(360, 6),
  35. OnNext(380, 7),
  36. OnNext(410, 8),
  37. OnNext(590, 9),
  38. OnCompleted<int>(650)
  39. );
  40. var res = scheduler.Start(() =>
  41. xs.TakeLastBuffer(0)
  42. );
  43. res.Messages.AssertEqual(
  44. OnNext<IList<int>>(650, lst => lst.Count == 0),
  45. OnCompleted<IList<int>>(650)
  46. );
  47. xs.Subscriptions.AssertEqual(
  48. Subscribe(200, 650)
  49. );
  50. }
  51. [Fact]
  52. public void TakeLastBuffer_Zero_Error()
  53. {
  54. var scheduler = new TestScheduler();
  55. var ex = new Exception();
  56. var xs = scheduler.CreateHotObservable(
  57. OnNext(180, 1),
  58. OnNext(210, 2),
  59. OnNext(250, 3),
  60. OnNext(270, 4),
  61. OnNext(310, 5),
  62. OnNext(360, 6),
  63. OnNext(380, 7),
  64. OnNext(410, 8),
  65. OnNext(590, 9),
  66. OnError<int>(650, ex)
  67. );
  68. var res = scheduler.Start(() =>
  69. xs.TakeLastBuffer(0)
  70. );
  71. res.Messages.AssertEqual(
  72. OnError<IList<int>>(650, ex)
  73. );
  74. xs.Subscriptions.AssertEqual(
  75. Subscribe(200, 650)
  76. );
  77. }
  78. [Fact]
  79. public void TakeLastBuffer_Zero_Disposed()
  80. {
  81. var scheduler = new TestScheduler();
  82. var xs = scheduler.CreateHotObservable(
  83. OnNext(180, 1),
  84. OnNext(210, 2),
  85. OnNext(250, 3),
  86. OnNext(270, 4),
  87. OnNext(310, 5),
  88. OnNext(360, 6),
  89. OnNext(380, 7),
  90. OnNext(410, 8),
  91. OnNext(590, 9)
  92. );
  93. var res = scheduler.Start(() =>
  94. xs.TakeLastBuffer(0)
  95. );
  96. res.Messages.AssertEqual(
  97. );
  98. xs.Subscriptions.AssertEqual(
  99. Subscribe(200, 1000)
  100. );
  101. }
  102. [Fact]
  103. public void TakeLastBuffer_One_Completed()
  104. {
  105. var scheduler = new TestScheduler();
  106. var xs = scheduler.CreateHotObservable(
  107. OnNext(180, 1),
  108. OnNext(210, 2),
  109. OnNext(250, 3),
  110. OnNext(270, 4),
  111. OnNext(310, 5),
  112. OnNext(360, 6),
  113. OnNext(380, 7),
  114. OnNext(410, 8),
  115. OnNext(590, 9),
  116. OnCompleted<int>(650)
  117. );
  118. var res = scheduler.Start(() =>
  119. xs.TakeLastBuffer(1)
  120. );
  121. res.Messages.AssertEqual(
  122. OnNext<IList<int>>(650, lst => lst.SequenceEqual(new[] { 9 })),
  123. OnCompleted<IList<int>>(650)
  124. );
  125. xs.Subscriptions.AssertEqual(
  126. Subscribe(200, 650)
  127. );
  128. }
  129. [Fact]
  130. public void TakeLastBuffer_One_Error()
  131. {
  132. var scheduler = new TestScheduler();
  133. var ex = new Exception();
  134. var xs = scheduler.CreateHotObservable(
  135. OnNext(180, 1),
  136. OnNext(210, 2),
  137. OnNext(250, 3),
  138. OnNext(270, 4),
  139. OnNext(310, 5),
  140. OnNext(360, 6),
  141. OnNext(380, 7),
  142. OnNext(410, 8),
  143. OnNext(590, 9),
  144. OnError<int>(650, ex)
  145. );
  146. var res = scheduler.Start(() =>
  147. xs.TakeLastBuffer(1)
  148. );
  149. res.Messages.AssertEqual(
  150. OnError<IList<int>>(650, ex)
  151. );
  152. xs.Subscriptions.AssertEqual(
  153. Subscribe(200, 650)
  154. );
  155. }
  156. [Fact]
  157. public void TakeLastBuffer_One_Disposed()
  158. {
  159. var scheduler = new TestScheduler();
  160. var xs = scheduler.CreateHotObservable(
  161. OnNext(180, 1),
  162. OnNext(210, 2),
  163. OnNext(250, 3),
  164. OnNext(270, 4),
  165. OnNext(310, 5),
  166. OnNext(360, 6),
  167. OnNext(380, 7),
  168. OnNext(410, 8),
  169. OnNext(590, 9)
  170. );
  171. var res = scheduler.Start(() =>
  172. xs.TakeLastBuffer(1)
  173. );
  174. res.Messages.AssertEqual(
  175. );
  176. xs.Subscriptions.AssertEqual(
  177. Subscribe(200, 1000)
  178. );
  179. }
  180. [Fact]
  181. public void TakeLastBuffer_Three_Completed()
  182. {
  183. var scheduler = new TestScheduler();
  184. var xs = scheduler.CreateHotObservable(
  185. OnNext(180, 1),
  186. OnNext(210, 2),
  187. OnNext(250, 3),
  188. OnNext(270, 4),
  189. OnNext(310, 5),
  190. OnNext(360, 6),
  191. OnNext(380, 7),
  192. OnNext(410, 8),
  193. OnNext(590, 9),
  194. OnCompleted<int>(650)
  195. );
  196. var res = scheduler.Start(() =>
  197. xs.TakeLastBuffer(3)
  198. );
  199. res.Messages.AssertEqual(
  200. OnNext<IList<int>>(650, lst => lst.SequenceEqual(new[] { 7, 8, 9 })),
  201. OnCompleted<IList<int>>(650)
  202. );
  203. xs.Subscriptions.AssertEqual(
  204. Subscribe(200, 650)
  205. );
  206. }
  207. [Fact]
  208. public void TakeLastBuffer_Three_Error()
  209. {
  210. var scheduler = new TestScheduler();
  211. var ex = new Exception();
  212. var xs = scheduler.CreateHotObservable(
  213. OnNext(180, 1),
  214. OnNext(210, 2),
  215. OnNext(250, 3),
  216. OnNext(270, 4),
  217. OnNext(310, 5),
  218. OnNext(360, 6),
  219. OnNext(380, 7),
  220. OnNext(410, 8),
  221. OnNext(590, 9),
  222. OnError<int>(650, ex)
  223. );
  224. var res = scheduler.Start(() =>
  225. xs.TakeLastBuffer(3)
  226. );
  227. res.Messages.AssertEqual(
  228. OnError<IList<int>>(650, ex)
  229. );
  230. xs.Subscriptions.AssertEqual(
  231. Subscribe(200, 650)
  232. );
  233. }
  234. [Fact]
  235. public void TakeLastBuffer_Three_Disposed()
  236. {
  237. var scheduler = new TestScheduler();
  238. var xs = scheduler.CreateHotObservable(
  239. OnNext(180, 1),
  240. OnNext(210, 2),
  241. OnNext(250, 3),
  242. OnNext(270, 4),
  243. OnNext(310, 5),
  244. OnNext(360, 6),
  245. OnNext(380, 7),
  246. OnNext(410, 8),
  247. OnNext(590, 9)
  248. );
  249. var res = scheduler.Start(() =>
  250. xs.TakeLastBuffer(3)
  251. );
  252. res.Messages.AssertEqual(
  253. );
  254. xs.Subscriptions.AssertEqual(
  255. Subscribe(200, 1000)
  256. );
  257. }
  258. #endregion
  259. #region + Timed +
  260. [Fact]
  261. public void TakeLastBuffer_Timed_ArgumentChecking()
  262. {
  263. var xs = Observable.Return(42);
  264. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  265. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1)));
  266. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default));
  267. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(1), default));
  268. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1), Scheduler.Default));
  269. }
  270. [Fact]
  271. public void TakeLastBuffer_Zero1()
  272. {
  273. var scheduler = new TestScheduler();
  274. var xs = scheduler.CreateHotObservable(
  275. OnNext(210, 1),
  276. OnNext(220, 2),
  277. OnCompleted<int>(230)
  278. );
  279. var res = scheduler.Start(() =>
  280. xs.TakeLastBuffer(TimeSpan.Zero, scheduler)
  281. );
  282. res.Messages.AssertEqual(
  283. OnNext<IList<int>>(230, lst => lst.Count == 0),
  284. OnCompleted<IList<int>>(230)
  285. );
  286. xs.Subscriptions.AssertEqual(
  287. Subscribe(200, 230)
  288. );
  289. }
  290. [Fact]
  291. public void TakeLastBuffer_Zero2()
  292. {
  293. var scheduler = new TestScheduler();
  294. var xs = scheduler.CreateHotObservable(
  295. OnNext(210, 1),
  296. OnNext(220, 2),
  297. OnNext(230, 3),
  298. OnCompleted<int>(230)
  299. );
  300. var res = scheduler.Start(() =>
  301. xs.TakeLastBuffer(TimeSpan.Zero, scheduler)
  302. );
  303. res.Messages.AssertEqual(
  304. OnNext<IList<int>>(230, lst => lst.Count == 0),
  305. OnCompleted<IList<int>>(230)
  306. );
  307. xs.Subscriptions.AssertEqual(
  308. Subscribe(200, 230)
  309. );
  310. }
  311. [Fact]
  312. public void TakeLastBuffer_Some1()
  313. {
  314. var scheduler = new TestScheduler();
  315. var xs = scheduler.CreateHotObservable(
  316. OnNext(210, 1),
  317. OnNext(220, 2),
  318. OnNext(230, 3),
  319. OnCompleted<int>(240)
  320. );
  321. var res = scheduler.Start(() =>
  322. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  323. );
  324. res.Messages.AssertEqual(
  325. OnNext<IList<int>>(240, lst => lst.SequenceEqual(new[] { 2, 3 })),
  326. OnCompleted<IList<int>>(240)
  327. );
  328. xs.Subscriptions.AssertEqual(
  329. Subscribe(200, 240)
  330. );
  331. }
  332. [Fact]
  333. public void TakeLastBuffer_Some2()
  334. {
  335. var scheduler = new TestScheduler();
  336. var xs = scheduler.CreateHotObservable(
  337. OnNext(210, 1),
  338. OnNext(220, 2),
  339. OnNext(230, 3),
  340. OnCompleted<int>(300)
  341. );
  342. var res = scheduler.Start(() =>
  343. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  344. );
  345. res.Messages.AssertEqual(
  346. OnNext<IList<int>>(300, lst => lst.Count == 0),
  347. OnCompleted<IList<int>>(300)
  348. );
  349. xs.Subscriptions.AssertEqual(
  350. Subscribe(200, 300)
  351. );
  352. }
  353. [Fact]
  354. public void TakeLastBuffer_Some3()
  355. {
  356. var scheduler = new TestScheduler();
  357. var xs = scheduler.CreateHotObservable(
  358. OnNext(210, 1),
  359. OnNext(220, 2),
  360. OnNext(230, 3),
  361. OnNext(240, 4),
  362. OnNext(250, 5),
  363. OnNext(260, 6),
  364. OnNext(270, 7),
  365. OnNext(280, 8),
  366. OnNext(290, 9),
  367. OnCompleted<int>(300)
  368. );
  369. var res = scheduler.Start(() =>
  370. xs.TakeLastBuffer(TimeSpan.FromTicks(45), scheduler)
  371. );
  372. res.Messages.AssertEqual(
  373. OnNext<IList<int>>(300, lst => lst.SequenceEqual(new[] { 6, 7, 8, 9 })),
  374. OnCompleted<IList<int>>(300)
  375. );
  376. xs.Subscriptions.AssertEqual(
  377. Subscribe(200, 300)
  378. );
  379. }
  380. [Fact]
  381. public void TakeLastBuffer_Some4()
  382. {
  383. var scheduler = new TestScheduler();
  384. var xs = scheduler.CreateHotObservable(
  385. OnNext(210, 1),
  386. OnNext(240, 2),
  387. OnNext(250, 3),
  388. OnNext(280, 4),
  389. OnNext(290, 5),
  390. OnNext(300, 6),
  391. OnCompleted<int>(350)
  392. );
  393. var res = scheduler.Start(() =>
  394. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  395. );
  396. res.Messages.AssertEqual(
  397. OnNext<IList<int>>(350, lst => lst.Count == 0),
  398. OnCompleted<IList<int>>(350)
  399. );
  400. xs.Subscriptions.AssertEqual(
  401. Subscribe(200, 350)
  402. );
  403. }
  404. [Fact]
  405. public void TakeLastBuffer_All()
  406. {
  407. var scheduler = new TestScheduler();
  408. var xs = scheduler.CreateHotObservable(
  409. OnNext(210, 1),
  410. OnNext(220, 2),
  411. OnCompleted<int>(230)
  412. );
  413. var res = scheduler.Start(() =>
  414. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  415. );
  416. res.Messages.AssertEqual(
  417. OnNext<IList<int>>(230, lst => lst.SequenceEqual(new[] { 1, 2 })),
  418. OnCompleted<IList<int>>(230)
  419. );
  420. xs.Subscriptions.AssertEqual(
  421. Subscribe(200, 230)
  422. );
  423. }
  424. [Fact]
  425. public void TakeLastBuffer_Error()
  426. {
  427. var scheduler = new TestScheduler();
  428. var ex = new Exception();
  429. var xs = scheduler.CreateHotObservable(
  430. OnError<int>(210, ex)
  431. );
  432. var res = scheduler.Start(() =>
  433. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  434. );
  435. res.Messages.AssertEqual(
  436. OnError<IList<int>>(210, ex)
  437. );
  438. xs.Subscriptions.AssertEqual(
  439. Subscribe(200, 210)
  440. );
  441. }
  442. [Fact]
  443. public void TakeLastBuffer_Never()
  444. {
  445. var scheduler = new TestScheduler();
  446. var ex = new Exception();
  447. var xs = scheduler.CreateHotObservable<int>(
  448. );
  449. var res = scheduler.Start(() =>
  450. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  451. );
  452. res.Messages.AssertEqual(
  453. );
  454. xs.Subscriptions.AssertEqual(
  455. Subscribe(200, 1000)
  456. );
  457. }
  458. [Fact]
  459. public void TakeLastBuffer_Default1()
  460. {
  461. var xs = Observable.Range(0, 10, Scheduler.Default);
  462. var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60)).SingleAsync();
  463. var e = new ManualResetEvent(false);
  464. var lst = default(IList<int>);
  465. res.Subscribe(
  466. x => lst = x,
  467. () => e.Set()
  468. );
  469. e.WaitOne();
  470. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  471. }
  472. [Fact]
  473. public void TakeLastBuffer_Default2()
  474. {
  475. var xs = Observable.Range(0, 10, Scheduler.Default);
  476. var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations()).SingleAsync();
  477. var e = new ManualResetEvent(false);
  478. var lst = default(IList<int>);
  479. res.Subscribe(
  480. x => lst = x,
  481. () => e.Set()
  482. );
  483. e.WaitOne();
  484. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  485. }
  486. [Fact]
  487. public void TakeLastBuffer_Default3()
  488. {
  489. var xs = Observable.Range(0, 10, Scheduler.Default);
  490. var res = xs.TakeLastBuffer(TimeSpan.Zero).SingleAsync();
  491. var e = new ManualResetEvent(false);
  492. var lst = default(IList<int>);
  493. res.Subscribe(
  494. x => lst = x,
  495. () => e.Set()
  496. );
  497. e.WaitOne();
  498. Assert.True(lst.Count == 0);
  499. }
  500. [Fact]
  501. public void TakeLastBuffer_Default4()
  502. {
  503. var xs = Observable.Range(0, 10, Scheduler.Default);
  504. var res = xs.TakeLastBuffer(TimeSpan.Zero, Scheduler.Default.DisableOptimizations()).SingleAsync();
  505. var e = new ManualResetEvent(false);
  506. var lst = default(IList<int>);
  507. res.Subscribe(
  508. x => lst = x,
  509. () => e.Set()
  510. );
  511. e.WaitOne();
  512. Assert.True(lst.Count == 0);
  513. }
  514. #endregion
  515. }
  516. }