ThrottleTest.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class ThrottleTest : ReactiveTest
  22. {
  23. [Fact]
  24. public void Throttle_ArgumentChecking()
  25. {
  26. var scheduler = new TestScheduler();
  27. var someObservable = Observable.Empty<int>();
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), TimeSpan.Zero));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(someObservable, TimeSpan.Zero, null));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), TimeSpan.Zero, scheduler));
  31. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1)));
  32. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  33. }
  34. private IEnumerable<Recorded<Notification<T>>> Generate<T, S>(S seed, Func<S, bool> condition, Func<S, S> iterate, Func<S, Recorded<Notification<T>>> selector, Func<S, Recorded<Notification<T>>> final)
  35. {
  36. S s;
  37. for (s = seed; condition(s); s = iterate(s))
  38. yield return selector(s);
  39. yield return final(s);
  40. }
  41. [Fact]
  42. public void Throttle_TimeSpan_AllPass()
  43. {
  44. var scheduler = new TestScheduler();
  45. var xs = scheduler.CreateHotObservable(
  46. OnNext(150, 0),
  47. OnNext(210, 1),
  48. OnNext(240, 2),
  49. OnNext(270, 3),
  50. OnNext(300, 4),
  51. OnCompleted<int>(400)
  52. );
  53. var res = scheduler.Start(() =>
  54. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  55. );
  56. res.Messages.AssertEqual(
  57. OnNext(230, 1),
  58. OnNext(260, 2),
  59. OnNext(290, 3),
  60. OnNext(320, 4),
  61. OnCompleted<int>(400)
  62. );
  63. xs.Subscriptions.AssertEqual(
  64. Subscribe(200, 400)
  65. );
  66. }
  67. [Fact]
  68. public void Throttle_TimeSpan_AllPass_ErrorEnd()
  69. {
  70. var ex = new Exception();
  71. var scheduler = new TestScheduler();
  72. var xs = scheduler.CreateHotObservable(
  73. OnNext(150, 0),
  74. OnNext(210, 1),
  75. OnNext(240, 2),
  76. OnNext(270, 3),
  77. OnNext(300, 4),
  78. OnError<int>(400, ex)
  79. );
  80. var res = scheduler.Start(() =>
  81. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  82. );
  83. res.Messages.AssertEqual(
  84. OnNext(230, 1),
  85. OnNext(260, 2),
  86. OnNext(290, 3),
  87. OnNext(320, 4),
  88. OnError<int>(400, ex)
  89. );
  90. xs.Subscriptions.AssertEqual(
  91. Subscribe(200, 400)
  92. );
  93. }
  94. [Fact]
  95. public void Throttle_TimeSpan_AllDrop()
  96. {
  97. var scheduler = new TestScheduler();
  98. var xs = scheduler.CreateHotObservable(
  99. OnNext(150, 0),
  100. OnNext(210, 1),
  101. OnNext(240, 2),
  102. OnNext(270, 3),
  103. OnNext(300, 4),
  104. OnNext(330, 5),
  105. OnNext(360, 6),
  106. OnNext(390, 7),
  107. OnCompleted<int>(400)
  108. );
  109. var res = scheduler.Start(() =>
  110. xs.Throttle(TimeSpan.FromTicks(40), scheduler)
  111. );
  112. res.Messages.AssertEqual(
  113. OnNext(400, 7),
  114. OnCompleted<int>(400)
  115. );
  116. xs.Subscriptions.AssertEqual(
  117. Subscribe(200, 400)
  118. );
  119. }
  120. [Fact]
  121. public void Throttle_TimeSpan_AllDrop_ErrorEnd()
  122. {
  123. var ex = new Exception();
  124. var scheduler = new TestScheduler();
  125. var xs = scheduler.CreateHotObservable(
  126. OnNext(150, 0),
  127. OnNext(210, 1),
  128. OnNext(240, 2),
  129. OnNext(270, 3),
  130. OnNext(300, 4),
  131. OnNext(330, 5),
  132. OnNext(360, 6),
  133. OnNext(390, 7),
  134. OnError<int>(400, ex)
  135. );
  136. var res = scheduler.Start(() =>
  137. xs.Throttle(TimeSpan.FromTicks(40), scheduler)
  138. );
  139. res.Messages.AssertEqual(
  140. OnError<int>(400, ex)
  141. );
  142. xs.Subscriptions.AssertEqual(
  143. Subscribe(200, 400)
  144. );
  145. }
  146. [Fact]
  147. public void Throttle_Empty()
  148. {
  149. var scheduler = new TestScheduler();
  150. var xs = scheduler.CreateHotObservable(
  151. OnNext(150, 0),
  152. OnCompleted<int>(300)
  153. );
  154. var res = scheduler.Start(() =>
  155. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  156. );
  157. res.Messages.AssertEqual(
  158. OnCompleted<int>(300)
  159. );
  160. xs.Subscriptions.AssertEqual(
  161. Subscribe(200, 300)
  162. );
  163. }
  164. [Fact]
  165. public void Throttle_Error()
  166. {
  167. var scheduler = new TestScheduler();
  168. var ex = new Exception();
  169. var xs = scheduler.CreateHotObservable(
  170. OnNext(150, 0),
  171. OnError<int>(300, ex)
  172. );
  173. var res = scheduler.Start(() =>
  174. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  175. );
  176. res.Messages.AssertEqual(
  177. OnError<int>(300, ex)
  178. );
  179. xs.Subscriptions.AssertEqual(
  180. Subscribe(200, 300)
  181. );
  182. }
  183. [Fact]
  184. public void Throttle_Never()
  185. {
  186. var scheduler = new TestScheduler();
  187. var xs = scheduler.CreateHotObservable(
  188. OnNext(150, 0)
  189. );
  190. var res = scheduler.Start(() =>
  191. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  192. );
  193. res.Messages.AssertEqual(
  194. );
  195. xs.Subscriptions.AssertEqual(
  196. Subscribe(200, 1000)
  197. );
  198. }
  199. [Fact]
  200. public void Throttle_Simple()
  201. {
  202. var scheduler = new TestScheduler();
  203. var xs = scheduler.CreateHotObservable(
  204. OnNext(150, 0),
  205. OnNext(210, 1),
  206. OnNext(240, 2),
  207. OnNext(250, 3),
  208. OnNext(280, 4),
  209. OnCompleted<int>(300)
  210. );
  211. var res = scheduler.Start(() =>
  212. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  213. );
  214. res.Messages.AssertEqual(
  215. OnNext(230, 1),
  216. OnNext(270, 3),
  217. OnNext(300, 4),
  218. OnCompleted<int>(300)
  219. );
  220. xs.Subscriptions.AssertEqual(
  221. Subscribe(200, 300)
  222. );
  223. }
  224. [Fact]
  225. public void Throttle_DefaultScheduler()
  226. {
  227. Assert.True(Observable.Return(1).Throttle(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 1 }));
  228. }
  229. [Fact]
  230. public void Throttle_Duration_ArgumentChecking()
  231. {
  232. var someObservable = DummyObservable<int>.Instance;
  233. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), x => someObservable));
  234. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(someObservable, default(Func<int, IObservable<string>>)));
  235. }
  236. [Fact]
  237. public void Throttle_Duration_DelayBehavior()
  238. {
  239. var scheduler = new TestScheduler();
  240. var xs = scheduler.CreateHotObservable(
  241. OnNext(150, -1),
  242. OnNext(250, 0),
  243. OnNext(280, 1),
  244. OnNext(310, 2),
  245. OnNext(350, 3),
  246. OnNext(400, 4),
  247. OnCompleted<int>(550)
  248. );
  249. var ys = new[] {
  250. scheduler.CreateColdObservable(
  251. OnNext(20, 42),
  252. OnNext(25, 99)
  253. ),
  254. scheduler.CreateColdObservable(
  255. OnNext(20, 42),
  256. OnNext(25, 99)
  257. ),
  258. scheduler.CreateColdObservable(
  259. OnNext(20, 42),
  260. OnNext(25, 99)
  261. ),
  262. scheduler.CreateColdObservable(
  263. OnNext(20, 42),
  264. OnNext(25, 99)
  265. ),
  266. scheduler.CreateColdObservable(
  267. OnNext(20, 42),
  268. OnNext(25, 99)
  269. ),
  270. };
  271. var res = scheduler.Start(() =>
  272. xs.Throttle(x => ys[x])
  273. );
  274. res.Messages.AssertEqual(
  275. OnNext<int>(250 + 20, 0),
  276. OnNext<int>(280 + 20, 1),
  277. OnNext<int>(310 + 20, 2),
  278. OnNext<int>(350 + 20, 3),
  279. OnNext<int>(400 + 20, 4),
  280. OnCompleted<int>(550)
  281. );
  282. xs.Subscriptions.AssertEqual(
  283. Subscribe(200, 550)
  284. );
  285. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  286. ys[1].Subscriptions.AssertEqual(Subscribe(280, 280 + 20));
  287. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  288. ys[3].Subscriptions.AssertEqual(Subscribe(350, 350 + 20));
  289. ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20));
  290. }
  291. [Fact]
  292. public void Throttle_Duration_ThrottleBehavior()
  293. {
  294. var scheduler = new TestScheduler();
  295. var xs = scheduler.CreateHotObservable(
  296. OnNext(150, -1),
  297. OnNext(250, 0),
  298. OnNext(280, 1),
  299. OnNext(310, 2),
  300. OnNext(350, 3),
  301. OnNext(400, 4),
  302. OnCompleted<int>(550)
  303. );
  304. var ys = new[] {
  305. scheduler.CreateColdObservable(
  306. OnNext(20, 42),
  307. OnNext(25, 99)
  308. ),
  309. scheduler.CreateColdObservable(
  310. OnNext(40, 42),
  311. OnNext(45, 99)
  312. ),
  313. scheduler.CreateColdObservable(
  314. OnNext(20, 42),
  315. OnNext(25, 99)
  316. ),
  317. scheduler.CreateColdObservable(
  318. OnNext(60, 42),
  319. OnNext(65, 99)
  320. ),
  321. scheduler.CreateColdObservable(
  322. OnNext(20, 42),
  323. OnNext(25, 99)
  324. ),
  325. };
  326. var res = scheduler.Start(() =>
  327. xs.Throttle(x => ys[x])
  328. );
  329. res.Messages.AssertEqual(
  330. OnNext<int>(250 + 20, 0),
  331. OnNext<int>(310 + 20, 2),
  332. OnNext<int>(400 + 20, 4),
  333. OnCompleted<int>(550)
  334. );
  335. xs.Subscriptions.AssertEqual(
  336. Subscribe(200, 550)
  337. );
  338. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  339. ys[1].Subscriptions.AssertEqual(Subscribe(280, 310));
  340. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  341. ys[3].Subscriptions.AssertEqual(Subscribe(350, 400));
  342. ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20));
  343. }
  344. [Fact]
  345. public void Throttle_Duration_EarlyCompletion()
  346. {
  347. var scheduler = new TestScheduler();
  348. var xs = scheduler.CreateHotObservable(
  349. OnNext(150, -1),
  350. OnNext(250, 0),
  351. OnNext(280, 1),
  352. OnNext(310, 2),
  353. OnNext(350, 3),
  354. OnNext(400, 4),
  355. OnCompleted<int>(410)
  356. );
  357. var ys = new[] {
  358. scheduler.CreateColdObservable(
  359. OnNext(20, 42),
  360. OnNext(25, 99)
  361. ),
  362. scheduler.CreateColdObservable(
  363. OnNext(40, 42),
  364. OnNext(45, 99)
  365. ),
  366. scheduler.CreateColdObservable(
  367. OnNext(20, 42),
  368. OnNext(25, 99)
  369. ),
  370. scheduler.CreateColdObservable(
  371. OnNext(60, 42),
  372. OnNext(65, 99)
  373. ),
  374. scheduler.CreateColdObservable(
  375. OnNext(20, 42),
  376. OnNext(25, 99)
  377. ),
  378. };
  379. var res = scheduler.Start(() =>
  380. xs.Throttle(x => ys[x])
  381. );
  382. res.Messages.AssertEqual(
  383. OnNext<int>(250 + 20, 0),
  384. OnNext<int>(310 + 20, 2),
  385. OnNext<int>(410, 4),
  386. OnCompleted<int>(410)
  387. );
  388. xs.Subscriptions.AssertEqual(
  389. Subscribe(200, 410)
  390. );
  391. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  392. ys[1].Subscriptions.AssertEqual(Subscribe(280, 310));
  393. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  394. ys[3].Subscriptions.AssertEqual(Subscribe(350, 400));
  395. ys[4].Subscriptions.AssertEqual(Subscribe(400, 410));
  396. }
  397. [Fact]
  398. public void Throttle_Duration_InnerError()
  399. {
  400. var scheduler = new TestScheduler();
  401. var xs = scheduler.CreateHotObservable(
  402. OnNext(150, 1),
  403. OnNext(250, 2),
  404. OnNext(350, 3),
  405. OnNext(450, 4),
  406. OnCompleted<int>(550)
  407. );
  408. var ex = new Exception();
  409. var res = scheduler.Start(() =>
  410. xs.Throttle(x =>
  411. x < 4 ? scheduler.CreateColdObservable(
  412. OnNext(x * 10, "Ignore"),
  413. OnNext(x * 10 + 5, "Aargh!")
  414. )
  415. : scheduler.CreateColdObservable(
  416. OnError<string>(x * 10, ex)
  417. )
  418. )
  419. );
  420. res.Messages.AssertEqual(
  421. OnNext<int>(250 + 2 * 10, 2),
  422. OnNext<int>(350 + 3 * 10, 3),
  423. OnError<int>(450 + 4 * 10, ex)
  424. );
  425. xs.Subscriptions.AssertEqual(
  426. Subscribe(200, 490)
  427. );
  428. }
  429. [Fact]
  430. public void Throttle_Duration_OuterError()
  431. {
  432. var scheduler = new TestScheduler();
  433. var ex = new Exception();
  434. var xs = scheduler.CreateHotObservable(
  435. OnNext(150, 1),
  436. OnNext(250, 2),
  437. OnNext(350, 3),
  438. OnNext(450, 4),
  439. OnError<int>(460, ex)
  440. );
  441. var res = scheduler.Start(() =>
  442. xs.Throttle(x =>
  443. scheduler.CreateColdObservable(
  444. OnNext(x * 10, "Ignore"),
  445. OnNext(x * 10 + 5, "Aargh!")
  446. )
  447. )
  448. );
  449. res.Messages.AssertEqual(
  450. OnNext<int>(250 + 2 * 10, 2),
  451. OnNext<int>(350 + 3 * 10, 3),
  452. OnError<int>(460, ex)
  453. );
  454. xs.Subscriptions.AssertEqual(
  455. Subscribe(200, 460)
  456. );
  457. }
  458. [Fact]
  459. public void Throttle_Duration_SelectorThrows()
  460. {
  461. var scheduler = new TestScheduler();
  462. var xs = scheduler.CreateHotObservable(
  463. OnNext(150, 1),
  464. OnNext(250, 2),
  465. OnNext(350, 3),
  466. OnNext(450, 4),
  467. OnCompleted<int>(550)
  468. );
  469. var ex = new Exception();
  470. var res = scheduler.Start(() =>
  471. xs.Throttle(x =>
  472. {
  473. if (x < 4)
  474. {
  475. return scheduler.CreateColdObservable(
  476. OnNext(x * 10, "Ignore"),
  477. OnNext(x * 10 + 5, "Aargh!")
  478. );
  479. }
  480. else
  481. throw ex;
  482. })
  483. );
  484. res.Messages.AssertEqual(
  485. OnNext<int>(250 + 2 * 10, 2),
  486. OnNext<int>(350 + 3 * 10, 3),
  487. OnError<int>(450, ex)
  488. );
  489. xs.Subscriptions.AssertEqual(
  490. Subscribe(200, 450)
  491. );
  492. }
  493. [Fact]
  494. public void Throttle_Duration_InnerDone_DelayBehavior()
  495. {
  496. var scheduler = new TestScheduler();
  497. var xs = scheduler.CreateHotObservable(
  498. OnNext(150, 1),
  499. OnNext(250, 2),
  500. OnNext(350, 3),
  501. OnNext(450, 4),
  502. OnCompleted<int>(550)
  503. );
  504. var ex = new Exception();
  505. var res = scheduler.Start(() =>
  506. xs.Throttle(x =>
  507. scheduler.CreateColdObservable(
  508. OnCompleted<string>(x * 10)
  509. )
  510. )
  511. );
  512. res.Messages.AssertEqual(
  513. OnNext<int>(250 + 2 * 10, 2),
  514. OnNext<int>(350 + 3 * 10, 3),
  515. OnNext<int>(450 + 4 * 10, 4),
  516. OnCompleted<int>(550)
  517. );
  518. xs.Subscriptions.AssertEqual(
  519. Subscribe(200, 550)
  520. );
  521. }
  522. [Fact]
  523. public void Throttle_Duration_InnerDone_ThrottleBehavior()
  524. {
  525. var scheduler = new TestScheduler();
  526. var xs = scheduler.CreateHotObservable(
  527. OnNext(150, 1),
  528. OnNext(250, 2),
  529. OnNext(280, 3),
  530. OnNext(300, 4),
  531. OnNext(400, 5),
  532. OnNext(410, 6),
  533. OnCompleted<int>(550)
  534. );
  535. var ex = new Exception();
  536. var res = scheduler.Start(() =>
  537. xs.Throttle(x =>
  538. scheduler.CreateColdObservable(
  539. OnCompleted<string>(x * 10)
  540. )
  541. )
  542. );
  543. res.Messages.AssertEqual(
  544. OnNext<int>(250 + 2 * 10, 2),
  545. OnNext<int>(300 + 4 * 10, 4),
  546. OnNext<int>(410 + 6 * 10, 6),
  547. OnCompleted<int>(550)
  548. );
  549. xs.Subscriptions.AssertEqual(
  550. Subscribe(200, 550)
  551. );
  552. }
  553. }
  554. }