ThrottleTest.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. using Assert = Xunit.Assert;
  13. namespace ReactiveTests.Tests
  14. {
  15. [TestClass]
  16. public class ThrottleTest : ReactiveTest
  17. {
  18. [TestMethod]
  19. public void Throttle_ArgumentChecking()
  20. {
  21. var scheduler = new TestScheduler();
  22. var someObservable = Observable.Empty<int>();
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), TimeSpan.Zero));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(someObservable, TimeSpan.Zero, null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), TimeSpan.Zero, scheduler));
  26. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1)));
  27. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  28. }
  29. private IEnumerable<Recorded<Notification<T>>> Generate<T, S>(S seed, Func<S, bool> condition, Func<S, S> iterate, Func<S, Recorded<Notification<T>>> selector, Func<S, Recorded<Notification<T>>> final)
  30. {
  31. S s;
  32. for (s = seed; condition(s); s = iterate(s))
  33. {
  34. yield return selector(s);
  35. }
  36. yield return final(s);
  37. }
  38. [TestMethod]
  39. public void Throttle_TimeSpan_AllPass()
  40. {
  41. var scheduler = new TestScheduler();
  42. var xs = scheduler.CreateHotObservable(
  43. OnNext(150, 0),
  44. OnNext(210, 1),
  45. OnNext(240, 2),
  46. OnNext(270, 3),
  47. OnNext(300, 4),
  48. OnCompleted<int>(400)
  49. );
  50. var res = scheduler.Start(() =>
  51. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  52. );
  53. res.Messages.AssertEqual(
  54. OnNext(230, 1),
  55. OnNext(260, 2),
  56. OnNext(290, 3),
  57. OnNext(320, 4),
  58. OnCompleted<int>(400)
  59. );
  60. xs.Subscriptions.AssertEqual(
  61. Subscribe(200, 400)
  62. );
  63. }
  64. [TestMethod]
  65. public void Throttle_TimeSpan_AllPass_ErrorEnd()
  66. {
  67. var ex = new Exception();
  68. var scheduler = new TestScheduler();
  69. var xs = scheduler.CreateHotObservable(
  70. OnNext(150, 0),
  71. OnNext(210, 1),
  72. OnNext(240, 2),
  73. OnNext(270, 3),
  74. OnNext(300, 4),
  75. OnError<int>(400, ex)
  76. );
  77. var res = scheduler.Start(() =>
  78. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  79. );
  80. res.Messages.AssertEqual(
  81. OnNext(230, 1),
  82. OnNext(260, 2),
  83. OnNext(290, 3),
  84. OnNext(320, 4),
  85. OnError<int>(400, ex)
  86. );
  87. xs.Subscriptions.AssertEqual(
  88. Subscribe(200, 400)
  89. );
  90. }
  91. [TestMethod]
  92. public void Throttle_TimeSpan_AllDrop()
  93. {
  94. var scheduler = new TestScheduler();
  95. var xs = scheduler.CreateHotObservable(
  96. OnNext(150, 0),
  97. OnNext(210, 1),
  98. OnNext(240, 2),
  99. OnNext(270, 3),
  100. OnNext(300, 4),
  101. OnNext(330, 5),
  102. OnNext(360, 6),
  103. OnNext(390, 7),
  104. OnCompleted<int>(400)
  105. );
  106. var res = scheduler.Start(() =>
  107. xs.Throttle(TimeSpan.FromTicks(40), scheduler)
  108. );
  109. res.Messages.AssertEqual(
  110. OnNext(400, 7),
  111. OnCompleted<int>(400)
  112. );
  113. xs.Subscriptions.AssertEqual(
  114. Subscribe(200, 400)
  115. );
  116. }
  117. [TestMethod]
  118. public void Throttle_TimeSpan_AllDrop_ErrorEnd()
  119. {
  120. var ex = new Exception();
  121. var scheduler = new TestScheduler();
  122. var xs = scheduler.CreateHotObservable(
  123. OnNext(150, 0),
  124. OnNext(210, 1),
  125. OnNext(240, 2),
  126. OnNext(270, 3),
  127. OnNext(300, 4),
  128. OnNext(330, 5),
  129. OnNext(360, 6),
  130. OnNext(390, 7),
  131. OnError<int>(400, ex)
  132. );
  133. var res = scheduler.Start(() =>
  134. xs.Throttle(TimeSpan.FromTicks(40), scheduler)
  135. );
  136. res.Messages.AssertEqual(
  137. OnError<int>(400, ex)
  138. );
  139. xs.Subscriptions.AssertEqual(
  140. Subscribe(200, 400)
  141. );
  142. }
  143. [TestMethod]
  144. public void Throttle_Empty()
  145. {
  146. var scheduler = new TestScheduler();
  147. var xs = scheduler.CreateHotObservable(
  148. OnNext(150, 0),
  149. OnCompleted<int>(300)
  150. );
  151. var res = scheduler.Start(() =>
  152. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  153. );
  154. res.Messages.AssertEqual(
  155. OnCompleted<int>(300)
  156. );
  157. xs.Subscriptions.AssertEqual(
  158. Subscribe(200, 300)
  159. );
  160. }
  161. [TestMethod]
  162. public void Throttle_Error()
  163. {
  164. var scheduler = new TestScheduler();
  165. var ex = new Exception();
  166. var xs = scheduler.CreateHotObservable(
  167. OnNext(150, 0),
  168. OnError<int>(300, ex)
  169. );
  170. var res = scheduler.Start(() =>
  171. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  172. );
  173. res.Messages.AssertEqual(
  174. OnError<int>(300, ex)
  175. );
  176. xs.Subscriptions.AssertEqual(
  177. Subscribe(200, 300)
  178. );
  179. }
  180. [TestMethod]
  181. public void Throttle_Never()
  182. {
  183. var scheduler = new TestScheduler();
  184. var xs = scheduler.CreateHotObservable(
  185. OnNext(150, 0)
  186. );
  187. var res = scheduler.Start(() =>
  188. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  189. );
  190. res.Messages.AssertEqual(
  191. );
  192. xs.Subscriptions.AssertEqual(
  193. Subscribe(200, 1000)
  194. );
  195. }
  196. [TestMethod]
  197. public void Throttle_Simple()
  198. {
  199. var scheduler = new TestScheduler();
  200. var xs = scheduler.CreateHotObservable(
  201. OnNext(150, 0),
  202. OnNext(210, 1),
  203. OnNext(240, 2),
  204. OnNext(250, 3),
  205. OnNext(280, 4),
  206. OnCompleted<int>(300)
  207. );
  208. var res = scheduler.Start(() =>
  209. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  210. );
  211. res.Messages.AssertEqual(
  212. OnNext(230, 1),
  213. OnNext(270, 3),
  214. OnNext(300, 4),
  215. OnCompleted<int>(300)
  216. );
  217. xs.Subscriptions.AssertEqual(
  218. Subscribe(200, 300)
  219. );
  220. }
  221. [TestMethod]
  222. public void Throttle_DefaultScheduler()
  223. {
  224. Assert.True(Observable.Return(1).Throttle(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 1 }));
  225. }
  226. [TestMethod]
  227. public void Throttle_Duration_ArgumentChecking()
  228. {
  229. var someObservable = DummyObservable<int>.Instance;
  230. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), x => someObservable));
  231. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(someObservable, default(Func<int, IObservable<string>>)));
  232. }
  233. [TestMethod]
  234. public void Throttle_Duration_DelayBehavior()
  235. {
  236. var scheduler = new TestScheduler();
  237. var xs = scheduler.CreateHotObservable(
  238. OnNext(150, -1),
  239. OnNext(250, 0),
  240. OnNext(280, 1),
  241. OnNext(310, 2),
  242. OnNext(350, 3),
  243. OnNext(400, 4),
  244. OnCompleted<int>(550)
  245. );
  246. var ys = new[] {
  247. scheduler.CreateColdObservable(
  248. OnNext(20, 42),
  249. OnNext(25, 99)
  250. ),
  251. scheduler.CreateColdObservable(
  252. OnNext(20, 42),
  253. OnNext(25, 99)
  254. ),
  255. scheduler.CreateColdObservable(
  256. OnNext(20, 42),
  257. OnNext(25, 99)
  258. ),
  259. scheduler.CreateColdObservable(
  260. OnNext(20, 42),
  261. OnNext(25, 99)
  262. ),
  263. scheduler.CreateColdObservable(
  264. OnNext(20, 42),
  265. OnNext(25, 99)
  266. ),
  267. };
  268. var res = scheduler.Start(() =>
  269. xs.Throttle(x => ys[x])
  270. );
  271. res.Messages.AssertEqual(
  272. OnNext(250 + 20, 0),
  273. OnNext(280 + 20, 1),
  274. OnNext(310 + 20, 2),
  275. OnNext(350 + 20, 3),
  276. OnNext(400 + 20, 4),
  277. OnCompleted<int>(550)
  278. );
  279. xs.Subscriptions.AssertEqual(
  280. Subscribe(200, 550)
  281. );
  282. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  283. ys[1].Subscriptions.AssertEqual(Subscribe(280, 280 + 20));
  284. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  285. ys[3].Subscriptions.AssertEqual(Subscribe(350, 350 + 20));
  286. ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20));
  287. }
  288. [TestMethod]
  289. public void Throttle_Duration_ThrottleBehavior()
  290. {
  291. var scheduler = new TestScheduler();
  292. var xs = scheduler.CreateHotObservable(
  293. OnNext(150, -1),
  294. OnNext(250, 0),
  295. OnNext(280, 1),
  296. OnNext(310, 2),
  297. OnNext(350, 3),
  298. OnNext(400, 4),
  299. OnCompleted<int>(550)
  300. );
  301. var ys = new[] {
  302. scheduler.CreateColdObservable(
  303. OnNext(20, 42),
  304. OnNext(25, 99)
  305. ),
  306. scheduler.CreateColdObservable(
  307. OnNext(40, 42),
  308. OnNext(45, 99)
  309. ),
  310. scheduler.CreateColdObservable(
  311. OnNext(20, 42),
  312. OnNext(25, 99)
  313. ),
  314. scheduler.CreateColdObservable(
  315. OnNext(60, 42),
  316. OnNext(65, 99)
  317. ),
  318. scheduler.CreateColdObservable(
  319. OnNext(20, 42),
  320. OnNext(25, 99)
  321. ),
  322. };
  323. var res = scheduler.Start(() =>
  324. xs.Throttle(x => ys[x])
  325. );
  326. res.Messages.AssertEqual(
  327. OnNext(250 + 20, 0),
  328. OnNext(310 + 20, 2),
  329. OnNext(400 + 20, 4),
  330. OnCompleted<int>(550)
  331. );
  332. xs.Subscriptions.AssertEqual(
  333. Subscribe(200, 550)
  334. );
  335. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  336. ys[1].Subscriptions.AssertEqual(Subscribe(280, 310));
  337. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  338. ys[3].Subscriptions.AssertEqual(Subscribe(350, 400));
  339. ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20));
  340. }
  341. [TestMethod]
  342. public void Throttle_Duration_EarlyCompletion()
  343. {
  344. var scheduler = new TestScheduler();
  345. var xs = scheduler.CreateHotObservable(
  346. OnNext(150, -1),
  347. OnNext(250, 0),
  348. OnNext(280, 1),
  349. OnNext(310, 2),
  350. OnNext(350, 3),
  351. OnNext(400, 4),
  352. OnCompleted<int>(410)
  353. );
  354. var ys = new[] {
  355. scheduler.CreateColdObservable(
  356. OnNext(20, 42),
  357. OnNext(25, 99)
  358. ),
  359. scheduler.CreateColdObservable(
  360. OnNext(40, 42),
  361. OnNext(45, 99)
  362. ),
  363. scheduler.CreateColdObservable(
  364. OnNext(20, 42),
  365. OnNext(25, 99)
  366. ),
  367. scheduler.CreateColdObservable(
  368. OnNext(60, 42),
  369. OnNext(65, 99)
  370. ),
  371. scheduler.CreateColdObservable(
  372. OnNext(20, 42),
  373. OnNext(25, 99)
  374. ),
  375. };
  376. var res = scheduler.Start(() =>
  377. xs.Throttle(x => ys[x])
  378. );
  379. res.Messages.AssertEqual(
  380. OnNext(250 + 20, 0),
  381. OnNext(310 + 20, 2),
  382. OnNext(410, 4),
  383. OnCompleted<int>(410)
  384. );
  385. xs.Subscriptions.AssertEqual(
  386. Subscribe(200, 410)
  387. );
  388. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  389. ys[1].Subscriptions.AssertEqual(Subscribe(280, 310));
  390. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  391. ys[3].Subscriptions.AssertEqual(Subscribe(350, 400));
  392. ys[4].Subscriptions.AssertEqual(Subscribe(400, 410));
  393. }
  394. [TestMethod]
  395. public void Throttle_Duration_InnerError()
  396. {
  397. var scheduler = new TestScheduler();
  398. var xs = scheduler.CreateHotObservable(
  399. OnNext(150, 1),
  400. OnNext(250, 2),
  401. OnNext(350, 3),
  402. OnNext(450, 4),
  403. OnCompleted<int>(550)
  404. );
  405. var ex = new Exception();
  406. var res = scheduler.Start(() =>
  407. xs.Throttle(x =>
  408. x < 4 ? scheduler.CreateColdObservable(
  409. OnNext(x * 10, "Ignore"),
  410. OnNext(x * 10 + 5, "Aargh!")
  411. )
  412. : scheduler.CreateColdObservable(
  413. OnError<string>(x * 10, ex)
  414. )
  415. )
  416. );
  417. res.Messages.AssertEqual(
  418. OnNext(250 + 2 * 10, 2),
  419. OnNext(350 + 3 * 10, 3),
  420. OnError<int>(450 + 4 * 10, ex)
  421. );
  422. xs.Subscriptions.AssertEqual(
  423. Subscribe(200, 490)
  424. );
  425. }
  426. [TestMethod]
  427. public void Throttle_Duration_OuterError()
  428. {
  429. var scheduler = new TestScheduler();
  430. var ex = new Exception();
  431. var xs = scheduler.CreateHotObservable(
  432. OnNext(150, 1),
  433. OnNext(250, 2),
  434. OnNext(350, 3),
  435. OnNext(450, 4),
  436. OnError<int>(460, ex)
  437. );
  438. var res = scheduler.Start(() =>
  439. xs.Throttle(x =>
  440. scheduler.CreateColdObservable(
  441. OnNext(x * 10, "Ignore"),
  442. OnNext(x * 10 + 5, "Aargh!")
  443. )
  444. )
  445. );
  446. res.Messages.AssertEqual(
  447. OnNext(250 + 2 * 10, 2),
  448. OnNext(350 + 3 * 10, 3),
  449. OnError<int>(460, ex)
  450. );
  451. xs.Subscriptions.AssertEqual(
  452. Subscribe(200, 460)
  453. );
  454. }
  455. [TestMethod]
  456. public void Throttle_Duration_SelectorThrows()
  457. {
  458. var scheduler = new TestScheduler();
  459. var xs = scheduler.CreateHotObservable(
  460. OnNext(150, 1),
  461. OnNext(250, 2),
  462. OnNext(350, 3),
  463. OnNext(450, 4),
  464. OnCompleted<int>(550)
  465. );
  466. var ex = new Exception();
  467. var res = scheduler.Start(() =>
  468. xs.Throttle(x =>
  469. {
  470. if (x < 4)
  471. {
  472. return scheduler.CreateColdObservable(
  473. OnNext(x * 10, "Ignore"),
  474. OnNext(x * 10 + 5, "Aargh!")
  475. );
  476. }
  477. else
  478. {
  479. throw ex;
  480. }
  481. })
  482. );
  483. res.Messages.AssertEqual(
  484. OnNext(250 + 2 * 10, 2),
  485. OnNext(350 + 3 * 10, 3),
  486. OnError<int>(450, ex)
  487. );
  488. xs.Subscriptions.AssertEqual(
  489. Subscribe(200, 450)
  490. );
  491. }
  492. [TestMethod]
  493. public void Throttle_Duration_InnerDone_DelayBehavior()
  494. {
  495. var scheduler = new TestScheduler();
  496. var xs = scheduler.CreateHotObservable(
  497. OnNext(150, 1),
  498. OnNext(250, 2),
  499. OnNext(350, 3),
  500. OnNext(450, 4),
  501. OnCompleted<int>(550)
  502. );
  503. var ex = new Exception();
  504. var res = scheduler.Start(() =>
  505. xs.Throttle(x =>
  506. scheduler.CreateColdObservable(
  507. OnCompleted<string>(x * 10)
  508. )
  509. )
  510. );
  511. res.Messages.AssertEqual(
  512. OnNext(250 + 2 * 10, 2),
  513. OnNext(350 + 3 * 10, 3),
  514. OnNext(450 + 4 * 10, 4),
  515. OnCompleted<int>(550)
  516. );
  517. xs.Subscriptions.AssertEqual(
  518. Subscribe(200, 550)
  519. );
  520. }
  521. [TestMethod]
  522. public void Throttle_Duration_InnerDone_ThrottleBehavior()
  523. {
  524. var scheduler = new TestScheduler();
  525. var xs = scheduler.CreateHotObservable(
  526. OnNext(150, 1),
  527. OnNext(250, 2),
  528. OnNext(280, 3),
  529. OnNext(300, 4),
  530. OnNext(400, 5),
  531. OnNext(410, 6),
  532. OnCompleted<int>(550)
  533. );
  534. var ex = new Exception();
  535. var res = scheduler.Start(() =>
  536. xs.Throttle(x =>
  537. scheduler.CreateColdObservable(
  538. OnCompleted<string>(x * 10)
  539. )
  540. )
  541. );
  542. res.Messages.AssertEqual(
  543. OnNext(250 + 2 * 10, 2),
  544. OnNext(300 + 4 * 10, 4),
  545. OnNext(410 + 6 * 10, 6),
  546. OnCompleted<int>(550)
  547. );
  548. xs.Subscriptions.AssertEqual(
  549. Subscribe(200, 550)
  550. );
  551. }
  552. }
  553. }