TakeLastBufferTest.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class TakeLastBufferTest : ReactiveTest
  22. {
  23. #region + Count +
  24. [Fact]
  25. public void TakeLastBuffer_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer<int>(null, 0));
  28. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(DummyObservable<int>.Instance, -1));
  29. }
  30. [Fact]
  31. public void TakeLastBuffer_Zero_Completed()
  32. {
  33. var scheduler = new TestScheduler();
  34. var xs = scheduler.CreateHotObservable(
  35. OnNext(180, 1),
  36. OnNext(210, 2),
  37. OnNext(250, 3),
  38. OnNext(270, 4),
  39. OnNext(310, 5),
  40. OnNext(360, 6),
  41. OnNext(380, 7),
  42. OnNext(410, 8),
  43. OnNext(590, 9),
  44. OnCompleted<int>(650)
  45. );
  46. var res = scheduler.Start(() =>
  47. xs.TakeLastBuffer(0)
  48. );
  49. res.Messages.AssertEqual(
  50. OnNext<IList<int>>(650, lst => lst.Count == 0),
  51. OnCompleted<IList<int>>(650)
  52. );
  53. xs.Subscriptions.AssertEqual(
  54. Subscribe(200, 650)
  55. );
  56. }
  57. [Fact]
  58. public void TakeLastBuffer_Zero_Error()
  59. {
  60. var scheduler = new TestScheduler();
  61. var ex = new Exception();
  62. var xs = scheduler.CreateHotObservable(
  63. OnNext(180, 1),
  64. OnNext(210, 2),
  65. OnNext(250, 3),
  66. OnNext(270, 4),
  67. OnNext(310, 5),
  68. OnNext(360, 6),
  69. OnNext(380, 7),
  70. OnNext(410, 8),
  71. OnNext(590, 9),
  72. OnError<int>(650, ex)
  73. );
  74. var res = scheduler.Start(() =>
  75. xs.TakeLastBuffer(0)
  76. );
  77. res.Messages.AssertEqual(
  78. OnError<IList<int>>(650, ex)
  79. );
  80. xs.Subscriptions.AssertEqual(
  81. Subscribe(200, 650)
  82. );
  83. }
  84. [Fact]
  85. public void TakeLastBuffer_Zero_Disposed()
  86. {
  87. var scheduler = new TestScheduler();
  88. var xs = scheduler.CreateHotObservable(
  89. OnNext(180, 1),
  90. OnNext(210, 2),
  91. OnNext(250, 3),
  92. OnNext(270, 4),
  93. OnNext(310, 5),
  94. OnNext(360, 6),
  95. OnNext(380, 7),
  96. OnNext(410, 8),
  97. OnNext(590, 9)
  98. );
  99. var res = scheduler.Start(() =>
  100. xs.TakeLastBuffer(0)
  101. );
  102. res.Messages.AssertEqual(
  103. );
  104. xs.Subscriptions.AssertEqual(
  105. Subscribe(200, 1000)
  106. );
  107. }
  108. [Fact]
  109. public void TakeLastBuffer_One_Completed()
  110. {
  111. var scheduler = new TestScheduler();
  112. var xs = scheduler.CreateHotObservable(
  113. OnNext(180, 1),
  114. OnNext(210, 2),
  115. OnNext(250, 3),
  116. OnNext(270, 4),
  117. OnNext(310, 5),
  118. OnNext(360, 6),
  119. OnNext(380, 7),
  120. OnNext(410, 8),
  121. OnNext(590, 9),
  122. OnCompleted<int>(650)
  123. );
  124. var res = scheduler.Start(() =>
  125. xs.TakeLastBuffer(1)
  126. );
  127. res.Messages.AssertEqual(
  128. OnNext<IList<int>>(650, lst => lst.SequenceEqual(new[] { 9 })),
  129. OnCompleted<IList<int>>(650)
  130. );
  131. xs.Subscriptions.AssertEqual(
  132. Subscribe(200, 650)
  133. );
  134. }
  135. [Fact]
  136. public void TakeLastBuffer_One_Error()
  137. {
  138. var scheduler = new TestScheduler();
  139. var ex = new Exception();
  140. var xs = scheduler.CreateHotObservable(
  141. OnNext(180, 1),
  142. OnNext(210, 2),
  143. OnNext(250, 3),
  144. OnNext(270, 4),
  145. OnNext(310, 5),
  146. OnNext(360, 6),
  147. OnNext(380, 7),
  148. OnNext(410, 8),
  149. OnNext(590, 9),
  150. OnError<int>(650, ex)
  151. );
  152. var res = scheduler.Start(() =>
  153. xs.TakeLastBuffer(1)
  154. );
  155. res.Messages.AssertEqual(
  156. OnError<IList<int>>(650, ex)
  157. );
  158. xs.Subscriptions.AssertEqual(
  159. Subscribe(200, 650)
  160. );
  161. }
  162. [Fact]
  163. public void TakeLastBuffer_One_Disposed()
  164. {
  165. var scheduler = new TestScheduler();
  166. var xs = scheduler.CreateHotObservable(
  167. OnNext(180, 1),
  168. OnNext(210, 2),
  169. OnNext(250, 3),
  170. OnNext(270, 4),
  171. OnNext(310, 5),
  172. OnNext(360, 6),
  173. OnNext(380, 7),
  174. OnNext(410, 8),
  175. OnNext(590, 9)
  176. );
  177. var res = scheduler.Start(() =>
  178. xs.TakeLastBuffer(1)
  179. );
  180. res.Messages.AssertEqual(
  181. );
  182. xs.Subscriptions.AssertEqual(
  183. Subscribe(200, 1000)
  184. );
  185. }
  186. [Fact]
  187. public void TakeLastBuffer_Three_Completed()
  188. {
  189. var scheduler = new TestScheduler();
  190. var xs = scheduler.CreateHotObservable(
  191. OnNext(180, 1),
  192. OnNext(210, 2),
  193. OnNext(250, 3),
  194. OnNext(270, 4),
  195. OnNext(310, 5),
  196. OnNext(360, 6),
  197. OnNext(380, 7),
  198. OnNext(410, 8),
  199. OnNext(590, 9),
  200. OnCompleted<int>(650)
  201. );
  202. var res = scheduler.Start(() =>
  203. xs.TakeLastBuffer(3)
  204. );
  205. res.Messages.AssertEqual(
  206. OnNext<IList<int>>(650, lst => lst.SequenceEqual(new[] { 7, 8, 9 })),
  207. OnCompleted<IList<int>>(650)
  208. );
  209. xs.Subscriptions.AssertEqual(
  210. Subscribe(200, 650)
  211. );
  212. }
  213. [Fact]
  214. public void TakeLastBuffer_Three_Error()
  215. {
  216. var scheduler = new TestScheduler();
  217. var ex = new Exception();
  218. var xs = scheduler.CreateHotObservable(
  219. OnNext(180, 1),
  220. OnNext(210, 2),
  221. OnNext(250, 3),
  222. OnNext(270, 4),
  223. OnNext(310, 5),
  224. OnNext(360, 6),
  225. OnNext(380, 7),
  226. OnNext(410, 8),
  227. OnNext(590, 9),
  228. OnError<int>(650, ex)
  229. );
  230. var res = scheduler.Start(() =>
  231. xs.TakeLastBuffer(3)
  232. );
  233. res.Messages.AssertEqual(
  234. OnError<IList<int>>(650, ex)
  235. );
  236. xs.Subscriptions.AssertEqual(
  237. Subscribe(200, 650)
  238. );
  239. }
  240. [Fact]
  241. public void TakeLastBuffer_Three_Disposed()
  242. {
  243. var scheduler = new TestScheduler();
  244. var xs = scheduler.CreateHotObservable(
  245. OnNext(180, 1),
  246. OnNext(210, 2),
  247. OnNext(250, 3),
  248. OnNext(270, 4),
  249. OnNext(310, 5),
  250. OnNext(360, 6),
  251. OnNext(380, 7),
  252. OnNext(410, 8),
  253. OnNext(590, 9)
  254. );
  255. var res = scheduler.Start(() =>
  256. xs.TakeLastBuffer(3)
  257. );
  258. res.Messages.AssertEqual(
  259. );
  260. xs.Subscriptions.AssertEqual(
  261. Subscribe(200, 1000)
  262. );
  263. }
  264. #endregion
  265. #region + Timed +
  266. [Fact]
  267. public void TakeLastBuffer_Timed_ArgumentChecking()
  268. {
  269. var xs = Observable.Return(42);
  270. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  271. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1)));
  272. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default));
  273. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(1), default(IScheduler)));
  274. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1), Scheduler.Default));
  275. }
  276. [Fact]
  277. public void TakeLastBuffer_Zero1()
  278. {
  279. var scheduler = new TestScheduler();
  280. var xs = scheduler.CreateHotObservable<int>(
  281. OnNext(210, 1),
  282. OnNext(220, 2),
  283. OnCompleted<int>(230)
  284. );
  285. var res = scheduler.Start(() =>
  286. xs.TakeLastBuffer(TimeSpan.Zero, scheduler)
  287. );
  288. res.Messages.AssertEqual(
  289. OnNext<IList<int>>(230, lst => lst.Count == 0),
  290. OnCompleted<IList<int>>(230)
  291. );
  292. xs.Subscriptions.AssertEqual(
  293. Subscribe(200, 230)
  294. );
  295. }
  296. [Fact]
  297. public void TakeLastBuffer_Zero2()
  298. {
  299. var scheduler = new TestScheduler();
  300. var xs = scheduler.CreateHotObservable<int>(
  301. OnNext(210, 1),
  302. OnNext(220, 2),
  303. OnNext(230, 3),
  304. OnCompleted<int>(230)
  305. );
  306. var res = scheduler.Start(() =>
  307. xs.TakeLastBuffer(TimeSpan.Zero, scheduler)
  308. );
  309. res.Messages.AssertEqual(
  310. OnNext<IList<int>>(230, lst => lst.Count == 0),
  311. OnCompleted<IList<int>>(230)
  312. );
  313. xs.Subscriptions.AssertEqual(
  314. Subscribe(200, 230)
  315. );
  316. }
  317. [Fact]
  318. public void TakeLastBuffer_Some1()
  319. {
  320. var scheduler = new TestScheduler();
  321. var xs = scheduler.CreateHotObservable<int>(
  322. OnNext(210, 1),
  323. OnNext(220, 2),
  324. OnNext(230, 3),
  325. OnCompleted<int>(240)
  326. );
  327. var res = scheduler.Start(() =>
  328. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  329. );
  330. res.Messages.AssertEqual(
  331. OnNext<IList<int>>(240, lst => lst.SequenceEqual(new[] { 2, 3 })),
  332. OnCompleted<IList<int>>(240)
  333. );
  334. xs.Subscriptions.AssertEqual(
  335. Subscribe(200, 240)
  336. );
  337. }
  338. [Fact]
  339. public void TakeLastBuffer_Some2()
  340. {
  341. var scheduler = new TestScheduler();
  342. var xs = scheduler.CreateHotObservable<int>(
  343. OnNext(210, 1),
  344. OnNext(220, 2),
  345. OnNext(230, 3),
  346. OnCompleted<int>(300)
  347. );
  348. var res = scheduler.Start(() =>
  349. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  350. );
  351. res.Messages.AssertEqual(
  352. OnNext<IList<int>>(300, lst => lst.Count == 0),
  353. OnCompleted<IList<int>>(300)
  354. );
  355. xs.Subscriptions.AssertEqual(
  356. Subscribe(200, 300)
  357. );
  358. }
  359. [Fact]
  360. public void TakeLastBuffer_Some3()
  361. {
  362. var scheduler = new TestScheduler();
  363. var xs = scheduler.CreateHotObservable<int>(
  364. OnNext(210, 1),
  365. OnNext(220, 2),
  366. OnNext(230, 3),
  367. OnNext(240, 4),
  368. OnNext(250, 5),
  369. OnNext(260, 6),
  370. OnNext(270, 7),
  371. OnNext(280, 8),
  372. OnNext(290, 9),
  373. OnCompleted<int>(300)
  374. );
  375. var res = scheduler.Start(() =>
  376. xs.TakeLastBuffer(TimeSpan.FromTicks(45), scheduler)
  377. );
  378. res.Messages.AssertEqual(
  379. OnNext<IList<int>>(300, lst => lst.SequenceEqual(new[] { 6, 7, 8, 9 })),
  380. OnCompleted<IList<int>>(300)
  381. );
  382. xs.Subscriptions.AssertEqual(
  383. Subscribe(200, 300)
  384. );
  385. }
  386. [Fact]
  387. public void TakeLastBuffer_Some4()
  388. {
  389. var scheduler = new TestScheduler();
  390. var xs = scheduler.CreateHotObservable<int>(
  391. OnNext(210, 1),
  392. OnNext(240, 2),
  393. OnNext(250, 3),
  394. OnNext(280, 4),
  395. OnNext(290, 5),
  396. OnNext(300, 6),
  397. OnCompleted<int>(350)
  398. );
  399. var res = scheduler.Start(() =>
  400. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  401. );
  402. res.Messages.AssertEqual(
  403. OnNext<IList<int>>(350, lst => lst.Count == 0),
  404. OnCompleted<IList<int>>(350)
  405. );
  406. xs.Subscriptions.AssertEqual(
  407. Subscribe(200, 350)
  408. );
  409. }
  410. [Fact]
  411. public void TakeLastBuffer_All()
  412. {
  413. var scheduler = new TestScheduler();
  414. var xs = scheduler.CreateHotObservable<int>(
  415. OnNext(210, 1),
  416. OnNext(220, 2),
  417. OnCompleted<int>(230)
  418. );
  419. var res = scheduler.Start(() =>
  420. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  421. );
  422. res.Messages.AssertEqual(
  423. OnNext<IList<int>>(230, lst => lst.SequenceEqual(new[] { 1, 2 })),
  424. OnCompleted<IList<int>>(230)
  425. );
  426. xs.Subscriptions.AssertEqual(
  427. Subscribe(200, 230)
  428. );
  429. }
  430. [Fact]
  431. public void TakeLastBuffer_Error()
  432. {
  433. var scheduler = new TestScheduler();
  434. var ex = new Exception();
  435. var xs = scheduler.CreateHotObservable<int>(
  436. OnError<int>(210, ex)
  437. );
  438. var res = scheduler.Start(() =>
  439. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  440. );
  441. res.Messages.AssertEqual(
  442. OnError<IList<int>>(210, ex)
  443. );
  444. xs.Subscriptions.AssertEqual(
  445. Subscribe(200, 210)
  446. );
  447. }
  448. [Fact]
  449. public void TakeLastBuffer_Never()
  450. {
  451. var scheduler = new TestScheduler();
  452. var ex = new Exception();
  453. var xs = scheduler.CreateHotObservable<int>(
  454. );
  455. var res = scheduler.Start(() =>
  456. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  457. );
  458. res.Messages.AssertEqual(
  459. );
  460. xs.Subscriptions.AssertEqual(
  461. Subscribe(200, 1000)
  462. );
  463. }
  464. [Fact]
  465. public void TakeLastBuffer_Default1()
  466. {
  467. var xs = Observable.Range(0, 10, Scheduler.Default);
  468. var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60)).SingleAsync();
  469. var e = new ManualResetEvent(false);
  470. var lst = default(IList<int>);
  471. res.Subscribe(
  472. x => lst = x,
  473. () => e.Set()
  474. );
  475. e.WaitOne();
  476. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  477. }
  478. [Fact]
  479. public void TakeLastBuffer_Default2()
  480. {
  481. var xs = Observable.Range(0, 10, Scheduler.Default);
  482. var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations()).SingleAsync();
  483. var e = new ManualResetEvent(false);
  484. var lst = default(IList<int>);
  485. res.Subscribe(
  486. x => lst = x,
  487. () => e.Set()
  488. );
  489. e.WaitOne();
  490. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  491. }
  492. [Fact]
  493. public void TakeLastBuffer_Default3()
  494. {
  495. var xs = Observable.Range(0, 10, Scheduler.Default);
  496. var res = xs.TakeLastBuffer(TimeSpan.Zero).SingleAsync();
  497. var e = new ManualResetEvent(false);
  498. var lst = default(IList<int>);
  499. res.Subscribe(
  500. x => lst = x,
  501. () => e.Set()
  502. );
  503. e.WaitOne();
  504. Assert.True(lst.Count == 0);
  505. }
  506. [Fact]
  507. public void TakeLastBuffer_Default4()
  508. {
  509. var xs = Observable.Range(0, 10, Scheduler.Default);
  510. var res = xs.TakeLastBuffer(TimeSpan.Zero, Scheduler.Default.DisableOptimizations()).SingleAsync();
  511. var e = new ManualResetEvent(false);
  512. var lst = default(IList<int>);
  513. res.Subscribe(
  514. x => lst = x,
  515. () => e.Set()
  516. );
  517. e.WaitOne();
  518. Assert.True(lst.Count == 0);
  519. }
  520. #endregion
  521. }
  522. }