TakeLastBufferTest.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using ReactiveTests.Dummies;
  12. using Microsoft.VisualStudio.TestTools.UnitTesting;
  13. using Assert = Xunit.Assert;
  14. namespace ReactiveTests.Tests
  15. {
  16. [TestClass]
  17. public class TakeLastBufferTest : ReactiveTest
  18. {
  19. #region + Count +
  20. [TestMethod]
  21. public void TakeLastBuffer_ArgumentChecking()
  22. {
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer<int>(null, 0));
  24. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(DummyObservable<int>.Instance, -1));
  25. }
  26. [TestMethod]
  27. public void TakeLastBuffer_Zero_Completed()
  28. {
  29. var scheduler = new TestScheduler();
  30. var xs = scheduler.CreateHotObservable(
  31. OnNext(180, 1),
  32. OnNext(210, 2),
  33. OnNext(250, 3),
  34. OnNext(270, 4),
  35. OnNext(310, 5),
  36. OnNext(360, 6),
  37. OnNext(380, 7),
  38. OnNext(410, 8),
  39. OnNext(590, 9),
  40. OnCompleted<int>(650)
  41. );
  42. var res = scheduler.Start(() =>
  43. xs.TakeLastBuffer(0)
  44. );
  45. res.Messages.AssertEqual(
  46. OnNext<IList<int>>(650, lst => lst.Count == 0),
  47. OnCompleted<IList<int>>(650)
  48. );
  49. xs.Subscriptions.AssertEqual(
  50. Subscribe(200, 650)
  51. );
  52. }
  53. [TestMethod]
  54. public void TakeLastBuffer_Zero_Error()
  55. {
  56. var scheduler = new TestScheduler();
  57. var ex = new Exception();
  58. var xs = scheduler.CreateHotObservable(
  59. OnNext(180, 1),
  60. OnNext(210, 2),
  61. OnNext(250, 3),
  62. OnNext(270, 4),
  63. OnNext(310, 5),
  64. OnNext(360, 6),
  65. OnNext(380, 7),
  66. OnNext(410, 8),
  67. OnNext(590, 9),
  68. OnError<int>(650, ex)
  69. );
  70. var res = scheduler.Start(() =>
  71. xs.TakeLastBuffer(0)
  72. );
  73. res.Messages.AssertEqual(
  74. OnError<IList<int>>(650, ex)
  75. );
  76. xs.Subscriptions.AssertEqual(
  77. Subscribe(200, 650)
  78. );
  79. }
  80. [TestMethod]
  81. public void TakeLastBuffer_Zero_Disposed()
  82. {
  83. var scheduler = new TestScheduler();
  84. var xs = scheduler.CreateHotObservable(
  85. OnNext(180, 1),
  86. OnNext(210, 2),
  87. OnNext(250, 3),
  88. OnNext(270, 4),
  89. OnNext(310, 5),
  90. OnNext(360, 6),
  91. OnNext(380, 7),
  92. OnNext(410, 8),
  93. OnNext(590, 9)
  94. );
  95. var res = scheduler.Start(() =>
  96. xs.TakeLastBuffer(0)
  97. );
  98. res.Messages.AssertEqual(
  99. );
  100. xs.Subscriptions.AssertEqual(
  101. Subscribe(200, 1000)
  102. );
  103. }
  104. [TestMethod]
  105. public void TakeLastBuffer_One_Completed()
  106. {
  107. var scheduler = new TestScheduler();
  108. var xs = scheduler.CreateHotObservable(
  109. OnNext(180, 1),
  110. OnNext(210, 2),
  111. OnNext(250, 3),
  112. OnNext(270, 4),
  113. OnNext(310, 5),
  114. OnNext(360, 6),
  115. OnNext(380, 7),
  116. OnNext(410, 8),
  117. OnNext(590, 9),
  118. OnCompleted<int>(650)
  119. );
  120. var res = scheduler.Start(() =>
  121. xs.TakeLastBuffer(1)
  122. );
  123. res.Messages.AssertEqual(
  124. OnNext<IList<int>>(650, lst => lst.SequenceEqual([9])),
  125. OnCompleted<IList<int>>(650)
  126. );
  127. xs.Subscriptions.AssertEqual(
  128. Subscribe(200, 650)
  129. );
  130. }
  131. [TestMethod]
  132. public void TakeLastBuffer_One_Error()
  133. {
  134. var scheduler = new TestScheduler();
  135. var ex = new Exception();
  136. var xs = scheduler.CreateHotObservable(
  137. OnNext(180, 1),
  138. OnNext(210, 2),
  139. OnNext(250, 3),
  140. OnNext(270, 4),
  141. OnNext(310, 5),
  142. OnNext(360, 6),
  143. OnNext(380, 7),
  144. OnNext(410, 8),
  145. OnNext(590, 9),
  146. OnError<int>(650, ex)
  147. );
  148. var res = scheduler.Start(() =>
  149. xs.TakeLastBuffer(1)
  150. );
  151. res.Messages.AssertEqual(
  152. OnError<IList<int>>(650, ex)
  153. );
  154. xs.Subscriptions.AssertEqual(
  155. Subscribe(200, 650)
  156. );
  157. }
  158. [TestMethod]
  159. public void TakeLastBuffer_One_Disposed()
  160. {
  161. var scheduler = new TestScheduler();
  162. var xs = scheduler.CreateHotObservable(
  163. OnNext(180, 1),
  164. OnNext(210, 2),
  165. OnNext(250, 3),
  166. OnNext(270, 4),
  167. OnNext(310, 5),
  168. OnNext(360, 6),
  169. OnNext(380, 7),
  170. OnNext(410, 8),
  171. OnNext(590, 9)
  172. );
  173. var res = scheduler.Start(() =>
  174. xs.TakeLastBuffer(1)
  175. );
  176. res.Messages.AssertEqual(
  177. );
  178. xs.Subscriptions.AssertEqual(
  179. Subscribe(200, 1000)
  180. );
  181. }
  182. [TestMethod]
  183. public void TakeLastBuffer_Three_Completed()
  184. {
  185. var scheduler = new TestScheduler();
  186. var xs = scheduler.CreateHotObservable(
  187. OnNext(180, 1),
  188. OnNext(210, 2),
  189. OnNext(250, 3),
  190. OnNext(270, 4),
  191. OnNext(310, 5),
  192. OnNext(360, 6),
  193. OnNext(380, 7),
  194. OnNext(410, 8),
  195. OnNext(590, 9),
  196. OnCompleted<int>(650)
  197. );
  198. var res = scheduler.Start(() =>
  199. xs.TakeLastBuffer(3)
  200. );
  201. res.Messages.AssertEqual(
  202. OnNext<IList<int>>(650, lst => lst.SequenceEqual([7, 8, 9])),
  203. OnCompleted<IList<int>>(650)
  204. );
  205. xs.Subscriptions.AssertEqual(
  206. Subscribe(200, 650)
  207. );
  208. }
  209. [TestMethod]
  210. public void TakeLastBuffer_Three_Error()
  211. {
  212. var scheduler = new TestScheduler();
  213. var ex = new Exception();
  214. var xs = scheduler.CreateHotObservable(
  215. OnNext(180, 1),
  216. OnNext(210, 2),
  217. OnNext(250, 3),
  218. OnNext(270, 4),
  219. OnNext(310, 5),
  220. OnNext(360, 6),
  221. OnNext(380, 7),
  222. OnNext(410, 8),
  223. OnNext(590, 9),
  224. OnError<int>(650, ex)
  225. );
  226. var res = scheduler.Start(() =>
  227. xs.TakeLastBuffer(3)
  228. );
  229. res.Messages.AssertEqual(
  230. OnError<IList<int>>(650, ex)
  231. );
  232. xs.Subscriptions.AssertEqual(
  233. Subscribe(200, 650)
  234. );
  235. }
  236. [TestMethod]
  237. public void TakeLastBuffer_Three_Disposed()
  238. {
  239. var scheduler = new TestScheduler();
  240. var xs = scheduler.CreateHotObservable(
  241. OnNext(180, 1),
  242. OnNext(210, 2),
  243. OnNext(250, 3),
  244. OnNext(270, 4),
  245. OnNext(310, 5),
  246. OnNext(360, 6),
  247. OnNext(380, 7),
  248. OnNext(410, 8),
  249. OnNext(590, 9)
  250. );
  251. var res = scheduler.Start(() =>
  252. xs.TakeLastBuffer(3)
  253. );
  254. res.Messages.AssertEqual(
  255. );
  256. xs.Subscriptions.AssertEqual(
  257. Subscribe(200, 1000)
  258. );
  259. }
  260. #endregion
  261. #region + Timed +
  262. [TestMethod]
  263. public void TakeLastBuffer_Timed_ArgumentChecking()
  264. {
  265. var xs = Observable.Return(42);
  266. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  267. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1)));
  268. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default));
  269. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(1), default));
  270. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1), Scheduler.Default));
  271. }
  272. [TestMethod]
  273. public void TakeLastBuffer_Zero1()
  274. {
  275. var scheduler = new TestScheduler();
  276. var xs = scheduler.CreateHotObservable(
  277. OnNext(210, 1),
  278. OnNext(220, 2),
  279. OnCompleted<int>(230)
  280. );
  281. var res = scheduler.Start(() =>
  282. xs.TakeLastBuffer(TimeSpan.Zero, scheduler)
  283. );
  284. res.Messages.AssertEqual(
  285. OnNext<IList<int>>(230, lst => lst.Count == 0),
  286. OnCompleted<IList<int>>(230)
  287. );
  288. xs.Subscriptions.AssertEqual(
  289. Subscribe(200, 230)
  290. );
  291. }
  292. [TestMethod]
  293. public void TakeLastBuffer_Zero2()
  294. {
  295. var scheduler = new TestScheduler();
  296. var xs = scheduler.CreateHotObservable(
  297. OnNext(210, 1),
  298. OnNext(220, 2),
  299. OnNext(230, 3),
  300. OnCompleted<int>(230)
  301. );
  302. var res = scheduler.Start(() =>
  303. xs.TakeLastBuffer(TimeSpan.Zero, scheduler)
  304. );
  305. res.Messages.AssertEqual(
  306. OnNext<IList<int>>(230, lst => lst.Count == 0),
  307. OnCompleted<IList<int>>(230)
  308. );
  309. xs.Subscriptions.AssertEqual(
  310. Subscribe(200, 230)
  311. );
  312. }
  313. [TestMethod]
  314. public void TakeLastBuffer_Some1()
  315. {
  316. var scheduler = new TestScheduler();
  317. var xs = scheduler.CreateHotObservable(
  318. OnNext(210, 1),
  319. OnNext(220, 2),
  320. OnNext(230, 3),
  321. OnCompleted<int>(240)
  322. );
  323. var res = scheduler.Start(() =>
  324. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  325. );
  326. res.Messages.AssertEqual(
  327. OnNext<IList<int>>(240, lst => lst.SequenceEqual([2, 3])),
  328. OnCompleted<IList<int>>(240)
  329. );
  330. xs.Subscriptions.AssertEqual(
  331. Subscribe(200, 240)
  332. );
  333. }
  334. [TestMethod]
  335. public void TakeLastBuffer_Some2()
  336. {
  337. var scheduler = new TestScheduler();
  338. var xs = scheduler.CreateHotObservable(
  339. OnNext(210, 1),
  340. OnNext(220, 2),
  341. OnNext(230, 3),
  342. OnCompleted<int>(300)
  343. );
  344. var res = scheduler.Start(() =>
  345. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  346. );
  347. res.Messages.AssertEqual(
  348. OnNext<IList<int>>(300, lst => lst.Count == 0),
  349. OnCompleted<IList<int>>(300)
  350. );
  351. xs.Subscriptions.AssertEqual(
  352. Subscribe(200, 300)
  353. );
  354. }
  355. [TestMethod]
  356. public void TakeLastBuffer_Some3()
  357. {
  358. var scheduler = new TestScheduler();
  359. var xs = scheduler.CreateHotObservable(
  360. OnNext(210, 1),
  361. OnNext(220, 2),
  362. OnNext(230, 3),
  363. OnNext(240, 4),
  364. OnNext(250, 5),
  365. OnNext(260, 6),
  366. OnNext(270, 7),
  367. OnNext(280, 8),
  368. OnNext(290, 9),
  369. OnCompleted<int>(300)
  370. );
  371. var res = scheduler.Start(() =>
  372. xs.TakeLastBuffer(TimeSpan.FromTicks(45), scheduler)
  373. );
  374. res.Messages.AssertEqual(
  375. OnNext<IList<int>>(300, lst => lst.SequenceEqual([6, 7, 8, 9])),
  376. OnCompleted<IList<int>>(300)
  377. );
  378. xs.Subscriptions.AssertEqual(
  379. Subscribe(200, 300)
  380. );
  381. }
  382. [TestMethod]
  383. public void TakeLastBuffer_Some4()
  384. {
  385. var scheduler = new TestScheduler();
  386. var xs = scheduler.CreateHotObservable(
  387. OnNext(210, 1),
  388. OnNext(240, 2),
  389. OnNext(250, 3),
  390. OnNext(280, 4),
  391. OnNext(290, 5),
  392. OnNext(300, 6),
  393. OnCompleted<int>(350)
  394. );
  395. var res = scheduler.Start(() =>
  396. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  397. );
  398. res.Messages.AssertEqual(
  399. OnNext<IList<int>>(350, lst => lst.Count == 0),
  400. OnCompleted<IList<int>>(350)
  401. );
  402. xs.Subscriptions.AssertEqual(
  403. Subscribe(200, 350)
  404. );
  405. }
  406. [TestMethod]
  407. public void TakeLastBuffer_All()
  408. {
  409. var scheduler = new TestScheduler();
  410. var xs = scheduler.CreateHotObservable(
  411. OnNext(210, 1),
  412. OnNext(220, 2),
  413. OnCompleted<int>(230)
  414. );
  415. var res = scheduler.Start(() =>
  416. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  417. );
  418. res.Messages.AssertEqual(
  419. OnNext<IList<int>>(230, lst => lst.SequenceEqual([1, 2])),
  420. OnCompleted<IList<int>>(230)
  421. );
  422. xs.Subscriptions.AssertEqual(
  423. Subscribe(200, 230)
  424. );
  425. }
  426. [TestMethod]
  427. public void TakeLastBuffer_Error()
  428. {
  429. var scheduler = new TestScheduler();
  430. var ex = new Exception();
  431. var xs = scheduler.CreateHotObservable(
  432. OnError<int>(210, ex)
  433. );
  434. var res = scheduler.Start(() =>
  435. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  436. );
  437. res.Messages.AssertEqual(
  438. OnError<IList<int>>(210, ex)
  439. );
  440. xs.Subscriptions.AssertEqual(
  441. Subscribe(200, 210)
  442. );
  443. }
  444. [TestMethod]
  445. public void TakeLastBuffer_Never()
  446. {
  447. var scheduler = new TestScheduler();
  448. var ex = new Exception();
  449. var xs = scheduler.CreateHotObservable<int>(
  450. );
  451. var res = scheduler.Start(() =>
  452. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  453. );
  454. res.Messages.AssertEqual(
  455. );
  456. xs.Subscriptions.AssertEqual(
  457. Subscribe(200, 1000)
  458. );
  459. }
  460. [TestMethod]
  461. public void TakeLastBuffer_Default1()
  462. {
  463. var xs = Observable.Range(0, 10, Scheduler.Default);
  464. var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60)).SingleAsync();
  465. var e = new ManualResetEvent(false);
  466. var lst = default(IList<int>);
  467. res.Subscribe(
  468. x => lst = x,
  469. () => e.Set()
  470. );
  471. e.WaitOne();
  472. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  473. }
  474. [TestMethod]
  475. public void TakeLastBuffer_Default2()
  476. {
  477. var xs = Observable.Range(0, 10, Scheduler.Default);
  478. var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations()).SingleAsync();
  479. var e = new ManualResetEvent(false);
  480. var lst = default(IList<int>);
  481. res.Subscribe(
  482. x => lst = x,
  483. () => e.Set()
  484. );
  485. e.WaitOne();
  486. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  487. }
  488. [TestMethod]
  489. public void TakeLastBuffer_Default3()
  490. {
  491. var xs = Observable.Range(0, 10, Scheduler.Default);
  492. var res = xs.TakeLastBuffer(TimeSpan.Zero).SingleAsync();
  493. var e = new ManualResetEvent(false);
  494. var lst = default(IList<int>);
  495. res.Subscribe(
  496. x => lst = x,
  497. () => e.Set()
  498. );
  499. e.WaitOne();
  500. Assert.True(lst.Count == 0);
  501. }
  502. [TestMethod]
  503. public void TakeLastBuffer_Default4()
  504. {
  505. var xs = Observable.Range(0, 10, Scheduler.Default);
  506. var res = xs.TakeLastBuffer(TimeSpan.Zero, Scheduler.Default.DisableOptimizations()).SingleAsync();
  507. var e = new ManualResetEvent(false);
  508. var lst = default(IList<int>);
  509. res.Subscribe(
  510. x => lst = x,
  511. () => e.Set()
  512. );
  513. e.WaitOne();
  514. Assert.True(lst.Count == 0);
  515. }
  516. #endregion
  517. }
  518. }