ThrottleTest.cs 19 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. using Assert = Xunit.Assert;
  13. namespace ReactiveTests.Tests
  14. {
  15. [TestClass]
  16. public class ThrottleTest : ReactiveTest
  17. {
  18. [TestMethod]
  19. public void Throttle_ArgumentChecking()
  20. {
  21. var scheduler = new TestScheduler();
  22. var someObservable = Observable.Empty<int>();
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), TimeSpan.Zero));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(someObservable, TimeSpan.Zero, null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), TimeSpan.Zero, scheduler));
  26. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1)));
  27. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  28. }
  29. [TestMethod]
  30. public void Throttle_TimeSpan_AllPass()
  31. {
  32. var scheduler = new TestScheduler();
  33. var xs = scheduler.CreateHotObservable(
  34. OnNext(150, 0),
  35. OnNext(210, 1),
  36. OnNext(240, 2),
  37. OnNext(270, 3),
  38. OnNext(300, 4),
  39. OnCompleted<int>(400)
  40. );
  41. var res = scheduler.Start(() =>
  42. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  43. );
  44. res.Messages.AssertEqual(
  45. OnNext(230, 1),
  46. OnNext(260, 2),
  47. OnNext(290, 3),
  48. OnNext(320, 4),
  49. OnCompleted<int>(400)
  50. );
  51. xs.Subscriptions.AssertEqual(
  52. Subscribe(200, 400)
  53. );
  54. }
  55. [TestMethod]
  56. public void Throttle_TimeSpan_AllPass_ErrorEnd()
  57. {
  58. var ex = new Exception();
  59. var scheduler = new TestScheduler();
  60. var xs = scheduler.CreateHotObservable(
  61. OnNext(150, 0),
  62. OnNext(210, 1),
  63. OnNext(240, 2),
  64. OnNext(270, 3),
  65. OnNext(300, 4),
  66. OnError<int>(400, ex)
  67. );
  68. var res = scheduler.Start(() =>
  69. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  70. );
  71. res.Messages.AssertEqual(
  72. OnNext(230, 1),
  73. OnNext(260, 2),
  74. OnNext(290, 3),
  75. OnNext(320, 4),
  76. OnError<int>(400, ex)
  77. );
  78. xs.Subscriptions.AssertEqual(
  79. Subscribe(200, 400)
  80. );
  81. }
  82. [TestMethod]
  83. public void Throttle_TimeSpan_AllDrop()
  84. {
  85. var scheduler = new TestScheduler();
  86. var xs = scheduler.CreateHotObservable(
  87. OnNext(150, 0),
  88. OnNext(210, 1),
  89. OnNext(240, 2),
  90. OnNext(270, 3),
  91. OnNext(300, 4),
  92. OnNext(330, 5),
  93. OnNext(360, 6),
  94. OnNext(390, 7),
  95. OnCompleted<int>(400)
  96. );
  97. var res = scheduler.Start(() =>
  98. xs.Throttle(TimeSpan.FromTicks(40), scheduler)
  99. );
  100. res.Messages.AssertEqual(
  101. OnNext(400, 7),
  102. OnCompleted<int>(400)
  103. );
  104. xs.Subscriptions.AssertEqual(
  105. Subscribe(200, 400)
  106. );
  107. }
  108. [TestMethod]
  109. public void Throttle_TimeSpan_AllDrop_ErrorEnd()
  110. {
  111. var ex = new Exception();
  112. var scheduler = new TestScheduler();
  113. var xs = scheduler.CreateHotObservable(
  114. OnNext(150, 0),
  115. OnNext(210, 1),
  116. OnNext(240, 2),
  117. OnNext(270, 3),
  118. OnNext(300, 4),
  119. OnNext(330, 5),
  120. OnNext(360, 6),
  121. OnNext(390, 7),
  122. OnError<int>(400, ex)
  123. );
  124. var res = scheduler.Start(() =>
  125. xs.Throttle(TimeSpan.FromTicks(40), scheduler)
  126. );
  127. res.Messages.AssertEqual(
  128. OnError<int>(400, ex)
  129. );
  130. xs.Subscriptions.AssertEqual(
  131. Subscribe(200, 400)
  132. );
  133. }
  134. [TestMethod]
  135. public void Throttle_Empty()
  136. {
  137. var scheduler = new TestScheduler();
  138. var xs = scheduler.CreateHotObservable(
  139. OnNext(150, 0),
  140. OnCompleted<int>(300)
  141. );
  142. var res = scheduler.Start(() =>
  143. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  144. );
  145. res.Messages.AssertEqual(
  146. OnCompleted<int>(300)
  147. );
  148. xs.Subscriptions.AssertEqual(
  149. Subscribe(200, 300)
  150. );
  151. }
  152. [TestMethod]
  153. public void Throttle_Error()
  154. {
  155. var scheduler = new TestScheduler();
  156. var ex = new Exception();
  157. var xs = scheduler.CreateHotObservable(
  158. OnNext(150, 0),
  159. OnError<int>(300, ex)
  160. );
  161. var res = scheduler.Start(() =>
  162. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  163. );
  164. res.Messages.AssertEqual(
  165. OnError<int>(300, ex)
  166. );
  167. xs.Subscriptions.AssertEqual(
  168. Subscribe(200, 300)
  169. );
  170. }
  171. [TestMethod]
  172. public void Throttle_Never()
  173. {
  174. var scheduler = new TestScheduler();
  175. var xs = scheduler.CreateHotObservable(
  176. OnNext(150, 0)
  177. );
  178. var res = scheduler.Start(() =>
  179. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  180. );
  181. res.Messages.AssertEqual(
  182. );
  183. xs.Subscriptions.AssertEqual(
  184. Subscribe(200, 1000)
  185. );
  186. }
  187. [TestMethod]
  188. public void Throttle_Simple()
  189. {
  190. var scheduler = new TestScheduler();
  191. var xs = scheduler.CreateHotObservable(
  192. OnNext(150, 0),
  193. OnNext(210, 1),
  194. OnNext(240, 2),
  195. OnNext(250, 3),
  196. OnNext(280, 4),
  197. OnCompleted<int>(300)
  198. );
  199. var res = scheduler.Start(() =>
  200. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  201. );
  202. res.Messages.AssertEqual(
  203. OnNext(230, 1),
  204. OnNext(270, 3),
  205. OnNext(300, 4),
  206. OnCompleted<int>(300)
  207. );
  208. xs.Subscriptions.AssertEqual(
  209. Subscribe(200, 300)
  210. );
  211. }
  212. [TestMethod]
  213. public void Throttle_DefaultScheduler()
  214. {
  215. Assert.True(Observable.Return(1).Throttle(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual([1]));
  216. }
  217. [TestMethod]
  218. public void Throttle_Duration_ArgumentChecking()
  219. {
  220. var someObservable = DummyObservable<int>.Instance;
  221. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), x => someObservable));
  222. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(someObservable, default(Func<int, IObservable<string>>)));
  223. }
  224. [TestMethod]
  225. public void Throttle_Duration_DelayBehavior()
  226. {
  227. var scheduler = new TestScheduler();
  228. var xs = scheduler.CreateHotObservable(
  229. OnNext(150, -1),
  230. OnNext(250, 0),
  231. OnNext(280, 1),
  232. OnNext(310, 2),
  233. OnNext(350, 3),
  234. OnNext(400, 4),
  235. OnCompleted<int>(550)
  236. );
  237. var ys = new[] {
  238. scheduler.CreateColdObservable(
  239. OnNext(20, 42),
  240. OnNext(25, 99)
  241. ),
  242. scheduler.CreateColdObservable(
  243. OnNext(20, 42),
  244. OnNext(25, 99)
  245. ),
  246. scheduler.CreateColdObservable(
  247. OnNext(20, 42),
  248. OnNext(25, 99)
  249. ),
  250. scheduler.CreateColdObservable(
  251. OnNext(20, 42),
  252. OnNext(25, 99)
  253. ),
  254. scheduler.CreateColdObservable(
  255. OnNext(20, 42),
  256. OnNext(25, 99)
  257. ),
  258. };
  259. var res = scheduler.Start(() =>
  260. xs.Throttle(x => ys[x])
  261. );
  262. res.Messages.AssertEqual(
  263. OnNext(250 + 20, 0),
  264. OnNext(280 + 20, 1),
  265. OnNext(310 + 20, 2),
  266. OnNext(350 + 20, 3),
  267. OnNext(400 + 20, 4),
  268. OnCompleted<int>(550)
  269. );
  270. xs.Subscriptions.AssertEqual(
  271. Subscribe(200, 550)
  272. );
  273. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  274. ys[1].Subscriptions.AssertEqual(Subscribe(280, 280 + 20));
  275. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  276. ys[3].Subscriptions.AssertEqual(Subscribe(350, 350 + 20));
  277. ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20));
  278. }
  279. [TestMethod]
  280. public void Throttle_Duration_ThrottleBehavior()
  281. {
  282. var scheduler = new TestScheduler();
  283. var xs = scheduler.CreateHotObservable(
  284. OnNext(150, -1),
  285. OnNext(250, 0),
  286. OnNext(280, 1),
  287. OnNext(310, 2),
  288. OnNext(350, 3),
  289. OnNext(400, 4),
  290. OnCompleted<int>(550)
  291. );
  292. var ys = new[] {
  293. scheduler.CreateColdObservable(
  294. OnNext(20, 42),
  295. OnNext(25, 99)
  296. ),
  297. scheduler.CreateColdObservable(
  298. OnNext(40, 42),
  299. OnNext(45, 99)
  300. ),
  301. scheduler.CreateColdObservable(
  302. OnNext(20, 42),
  303. OnNext(25, 99)
  304. ),
  305. scheduler.CreateColdObservable(
  306. OnNext(60, 42),
  307. OnNext(65, 99)
  308. ),
  309. scheduler.CreateColdObservable(
  310. OnNext(20, 42),
  311. OnNext(25, 99)
  312. ),
  313. };
  314. var res = scheduler.Start(() =>
  315. xs.Throttle(x => ys[x])
  316. );
  317. res.Messages.AssertEqual(
  318. OnNext(250 + 20, 0),
  319. OnNext(310 + 20, 2),
  320. OnNext(400 + 20, 4),
  321. OnCompleted<int>(550)
  322. );
  323. xs.Subscriptions.AssertEqual(
  324. Subscribe(200, 550)
  325. );
  326. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  327. ys[1].Subscriptions.AssertEqual(Subscribe(280, 310));
  328. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  329. ys[3].Subscriptions.AssertEqual(Subscribe(350, 400));
  330. ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20));
  331. }
  332. [TestMethod]
  333. public void Throttle_Duration_EarlyCompletion()
  334. {
  335. var scheduler = new TestScheduler();
  336. var xs = scheduler.CreateHotObservable(
  337. OnNext(150, -1),
  338. OnNext(250, 0),
  339. OnNext(280, 1),
  340. OnNext(310, 2),
  341. OnNext(350, 3),
  342. OnNext(400, 4),
  343. OnCompleted<int>(410)
  344. );
  345. var ys = new[] {
  346. scheduler.CreateColdObservable(
  347. OnNext(20, 42),
  348. OnNext(25, 99)
  349. ),
  350. scheduler.CreateColdObservable(
  351. OnNext(40, 42),
  352. OnNext(45, 99)
  353. ),
  354. scheduler.CreateColdObservable(
  355. OnNext(20, 42),
  356. OnNext(25, 99)
  357. ),
  358. scheduler.CreateColdObservable(
  359. OnNext(60, 42),
  360. OnNext(65, 99)
  361. ),
  362. scheduler.CreateColdObservable(
  363. OnNext(20, 42),
  364. OnNext(25, 99)
  365. ),
  366. };
  367. var res = scheduler.Start(() =>
  368. xs.Throttle(x => ys[x])
  369. );
  370. res.Messages.AssertEqual(
  371. OnNext(250 + 20, 0),
  372. OnNext(310 + 20, 2),
  373. OnNext(410, 4),
  374. OnCompleted<int>(410)
  375. );
  376. xs.Subscriptions.AssertEqual(
  377. Subscribe(200, 410)
  378. );
  379. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  380. ys[1].Subscriptions.AssertEqual(Subscribe(280, 310));
  381. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  382. ys[3].Subscriptions.AssertEqual(Subscribe(350, 400));
  383. ys[4].Subscriptions.AssertEqual(Subscribe(400, 410));
  384. }
  385. [TestMethod]
  386. public void Throttle_Duration_InnerError()
  387. {
  388. var scheduler = new TestScheduler();
  389. var xs = scheduler.CreateHotObservable(
  390. OnNext(150, 1),
  391. OnNext(250, 2),
  392. OnNext(350, 3),
  393. OnNext(450, 4),
  394. OnCompleted<int>(550)
  395. );
  396. var ex = new Exception();
  397. var res = scheduler.Start(() =>
  398. xs.Throttle(x =>
  399. x < 4 ? scheduler.CreateColdObservable(
  400. OnNext(x * 10, "Ignore"),
  401. OnNext(x * 10 + 5, "Aargh!")
  402. )
  403. : scheduler.CreateColdObservable(
  404. OnError<string>(x * 10, ex)
  405. )
  406. )
  407. );
  408. res.Messages.AssertEqual(
  409. OnNext(250 + 2 * 10, 2),
  410. OnNext(350 + 3 * 10, 3),
  411. OnError<int>(450 + 4 * 10, ex)
  412. );
  413. xs.Subscriptions.AssertEqual(
  414. Subscribe(200, 490)
  415. );
  416. }
  417. [TestMethod]
  418. public void Throttle_Duration_OuterError()
  419. {
  420. var scheduler = new TestScheduler();
  421. var ex = new Exception();
  422. var xs = scheduler.CreateHotObservable(
  423. OnNext(150, 1),
  424. OnNext(250, 2),
  425. OnNext(350, 3),
  426. OnNext(450, 4),
  427. OnError<int>(460, ex)
  428. );
  429. var res = scheduler.Start(() =>
  430. xs.Throttle(x =>
  431. scheduler.CreateColdObservable(
  432. OnNext(x * 10, "Ignore"),
  433. OnNext(x * 10 + 5, "Aargh!")
  434. )
  435. )
  436. );
  437. res.Messages.AssertEqual(
  438. OnNext(250 + 2 * 10, 2),
  439. OnNext(350 + 3 * 10, 3),
  440. OnError<int>(460, ex)
  441. );
  442. xs.Subscriptions.AssertEqual(
  443. Subscribe(200, 460)
  444. );
  445. }
  446. [TestMethod]
  447. public void Throttle_Duration_SelectorThrows()
  448. {
  449. var scheduler = new TestScheduler();
  450. var xs = scheduler.CreateHotObservable(
  451. OnNext(150, 1),
  452. OnNext(250, 2),
  453. OnNext(350, 3),
  454. OnNext(450, 4),
  455. OnCompleted<int>(550)
  456. );
  457. var ex = new Exception();
  458. var res = scheduler.Start(() =>
  459. xs.Throttle(x =>
  460. {
  461. if (x < 4)
  462. {
  463. return scheduler.CreateColdObservable(
  464. OnNext(x * 10, "Ignore"),
  465. OnNext(x * 10 + 5, "Aargh!")
  466. );
  467. }
  468. else
  469. {
  470. throw ex;
  471. }
  472. })
  473. );
  474. res.Messages.AssertEqual(
  475. OnNext(250 + 2 * 10, 2),
  476. OnNext(350 + 3 * 10, 3),
  477. OnError<int>(450, ex)
  478. );
  479. xs.Subscriptions.AssertEqual(
  480. Subscribe(200, 450)
  481. );
  482. }
  483. [TestMethod]
  484. public void Throttle_Duration_InnerDone_DelayBehavior()
  485. {
  486. var scheduler = new TestScheduler();
  487. var xs = scheduler.CreateHotObservable(
  488. OnNext(150, 1),
  489. OnNext(250, 2),
  490. OnNext(350, 3),
  491. OnNext(450, 4),
  492. OnCompleted<int>(550)
  493. );
  494. var ex = new Exception();
  495. var res = scheduler.Start(() =>
  496. xs.Throttle(x =>
  497. scheduler.CreateColdObservable(
  498. OnCompleted<string>(x * 10)
  499. )
  500. )
  501. );
  502. res.Messages.AssertEqual(
  503. OnNext(250 + 2 * 10, 2),
  504. OnNext(350 + 3 * 10, 3),
  505. OnNext(450 + 4 * 10, 4),
  506. OnCompleted<int>(550)
  507. );
  508. xs.Subscriptions.AssertEqual(
  509. Subscribe(200, 550)
  510. );
  511. }
  512. [TestMethod]
  513. public void Throttle_Duration_InnerDone_ThrottleBehavior()
  514. {
  515. var scheduler = new TestScheduler();
  516. var xs = scheduler.CreateHotObservable(
  517. OnNext(150, 1),
  518. OnNext(250, 2),
  519. OnNext(280, 3),
  520. OnNext(300, 4),
  521. OnNext(400, 5),
  522. OnNext(410, 6),
  523. OnCompleted<int>(550)
  524. );
  525. var ex = new Exception();
  526. var res = scheduler.Start(() =>
  527. xs.Throttle(x =>
  528. scheduler.CreateColdObservable(
  529. OnCompleted<string>(x * 10)
  530. )
  531. )
  532. );
  533. res.Messages.AssertEqual(
  534. OnNext(250 + 2 * 10, 2),
  535. OnNext(300 + 4 * 10, 4),
  536. OnNext(410 + 6 * 10, 6),
  537. OnCompleted<int>(550)
  538. );
  539. xs.Subscriptions.AssertEqual(
  540. Subscribe(200, 550)
  541. );
  542. }
  543. }
  544. }