RepeatWhenTest.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. using Assert = Xunit.Assert;
  13. namespace ReactiveTests.Tests
  14. {
  15. [TestClass]
  16. public class RepeatWhenTest : ReactiveTest
  17. {
  18. [TestMethod]
  19. public void RepeatWhen_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RepeatWhen<int, object>(null, v => v));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RepeatWhen<int, object>(Observable.Return(1), null));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.RepeatWhen(v => v).Subscribe(null));
  24. }
  25. [TestMethod]
  26. public void RepeatWhen_Handler_Crash()
  27. {
  28. var scheduler = new TestScheduler();
  29. var xs = scheduler.CreateColdObservable(
  30. OnCompleted<int>(10)
  31. );
  32. var ex = new InvalidOperationException();
  33. var res = scheduler.Start(() =>
  34. xs.RepeatWhen<int, object>(v => { throw ex; })
  35. );
  36. res.Messages.AssertEqual(
  37. OnError<int>(200, ex)
  38. );
  39. xs.Subscriptions.AssertEqual(
  40. );
  41. }
  42. [TestMethod]
  43. public void RepeatWhen_Handler_Error()
  44. {
  45. var scheduler = new TestScheduler();
  46. var xs = scheduler.CreateColdObservable(
  47. OnCompleted<int>(10)
  48. );
  49. var ex = new InvalidOperationException();
  50. var res = scheduler.Start(() =>
  51. xs.RepeatWhen(v => v.Select<object, object>(w => throw ex))
  52. );
  53. res.Messages.AssertEqual(
  54. OnError<int>(210, ex)
  55. );
  56. xs.Subscriptions.AssertEqual(
  57. Subscribe(200, 210)
  58. );
  59. }
  60. [TestMethod]
  61. public void RepeatWhen_Handler_Completed()
  62. {
  63. var scheduler = new TestScheduler();
  64. var xs = scheduler.CreateColdObservable(
  65. OnCompleted<int>(10)
  66. );
  67. var ex = new InvalidOperationException();
  68. var res = scheduler.Start(() =>
  69. xs.RepeatWhen(v => v.Take(1).Skip(1))
  70. );
  71. res.Messages.AssertEqual(
  72. OnCompleted<int>(210)
  73. );
  74. xs.Subscriptions.AssertEqual(
  75. Subscribe(200, 210)
  76. );
  77. }
  78. [TestMethod]
  79. public void RepeatWhen_Disposed()
  80. {
  81. var main = new Subject<int>();
  82. var inner = new Subject<int>();
  83. var d = main.RepeatWhen(v => inner).Subscribe();
  84. Assert.True(main.HasObservers);
  85. Assert.True(inner.HasObservers);
  86. d.Dispose();
  87. Assert.False(main.HasObservers);
  88. Assert.False(inner.HasObservers);
  89. }
  90. [TestMethod]
  91. public void RepeatWhen_Handler_Completed_Disposes_Main()
  92. {
  93. var main = new Subject<int>();
  94. var inner = new Subject<int>();
  95. var end = 0;
  96. var items = 0;
  97. var errors = 0;
  98. main.RepeatWhen(v => inner).Subscribe(
  99. onNext: v => items++,
  100. onError: e => errors++,
  101. onCompleted: () => end++);
  102. Assert.True(main.HasObservers);
  103. Assert.True(inner.HasObservers);
  104. inner.OnCompleted();
  105. Assert.False(main.HasObservers);
  106. Assert.False(inner.HasObservers);
  107. Assert.Equal(0, items);
  108. Assert.Equal(0, errors);
  109. Assert.Equal(1, end);
  110. }
  111. [TestMethod]
  112. public void RepeatWhen_Handler_Error_Disposes_Main()
  113. {
  114. var main = new Subject<int>();
  115. var inner = new Subject<int>();
  116. var end = 0;
  117. var items = 0;
  118. var errors = 0;
  119. main.RepeatWhen(v => inner).Subscribe(
  120. onNext: v => items++,
  121. onError: e => errors++,
  122. onCompleted: () => end++);
  123. Assert.True(main.HasObservers);
  124. Assert.True(inner.HasObservers);
  125. inner.OnError(new InvalidOperationException());
  126. Assert.False(main.HasObservers);
  127. Assert.False(inner.HasObservers);
  128. Assert.Equal(0, items);
  129. Assert.Equal(1, errors);
  130. Assert.Equal(0, end);
  131. }
  132. [TestMethod]
  133. public void RepeatWhen_Basic()
  134. {
  135. var scheduler = new TestScheduler();
  136. var xs = scheduler.CreateColdObservable(
  137. OnNext(100, 1),
  138. OnNext(150, 2),
  139. OnNext(200, 3),
  140. OnCompleted<int>(250)
  141. );
  142. var res = scheduler.Start(() =>
  143. xs.RepeatWhen(v => v)
  144. );
  145. res.Messages.AssertEqual(
  146. OnNext(300, 1),
  147. OnNext(350, 2),
  148. OnNext(400, 3),
  149. OnNext(550, 1),
  150. OnNext(600, 2),
  151. OnNext(650, 3),
  152. OnNext(800, 1),
  153. OnNext(850, 2),
  154. OnNext(900, 3)
  155. );
  156. xs.Subscriptions.AssertEqual(
  157. Subscribe(200, 450),
  158. Subscribe(450, 700),
  159. Subscribe(700, 950),
  160. Subscribe(950, 1000)
  161. );
  162. }
  163. [TestMethod]
  164. public void RepeatWhen_Infinite()
  165. {
  166. var scheduler = new TestScheduler();
  167. var xs = scheduler.CreateColdObservable(
  168. OnNext(100, 1),
  169. OnNext(150, 2),
  170. OnNext(200, 3)
  171. );
  172. var res = scheduler.Start(() =>
  173. xs.RepeatWhen(v => v)
  174. );
  175. res.Messages.AssertEqual(
  176. OnNext(300, 1),
  177. OnNext(350, 2),
  178. OnNext(400, 3)
  179. );
  180. xs.Subscriptions.AssertEqual(
  181. Subscribe(200, 1000)
  182. );
  183. }
  184. [TestMethod]
  185. public void RepeatWhen_Error()
  186. {
  187. var scheduler = new TestScheduler();
  188. var ex = new Exception();
  189. var xs = scheduler.CreateColdObservable(
  190. OnNext(100, 1),
  191. OnNext(150, 2),
  192. OnNext(200, 3),
  193. OnError<int>(250, ex)
  194. );
  195. var res = scheduler.Start(() =>
  196. xs.RepeatWhen(v => v)
  197. );
  198. res.Messages.AssertEqual(
  199. OnNext(300, 1),
  200. OnNext(350, 2),
  201. OnNext(400, 3),
  202. OnError<int>(450, ex)
  203. );
  204. xs.Subscriptions.AssertEqual(
  205. Subscribe(200, 450)
  206. );
  207. }
  208. [TestMethod]
  209. public void RepeatWhen_Throws()
  210. {
  211. var scheduler1 = new TestScheduler();
  212. var xs = Observable.Return(1, scheduler1).RepeatWhen(v => v);
  213. xs.Subscribe(x => { throw new InvalidOperationException(); });
  214. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  215. var scheduler2 = new TestScheduler();
  216. var ys = Observable.Throw<int>(new Exception(), scheduler2).RepeatWhen(v => v);
  217. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  218. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  219. var scheduler3 = new TestScheduler();
  220. var zs = Observable.Return(1, scheduler3).RepeatWhen(v => v);
  221. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  222. scheduler3.ScheduleAbsolute(210, () => d.Dispose());
  223. scheduler3.Start();
  224. var xss = Observable.Create(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v => v);
  225. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  226. }
  227. [TestMethod]
  228. public void RepeatWhen_RepeatCount_Basic()
  229. {
  230. var scheduler = new TestScheduler();
  231. var xs = scheduler.CreateColdObservable(
  232. OnNext(5, 1),
  233. OnNext(10, 2),
  234. OnNext(15, 3),
  235. OnCompleted<int>(20)
  236. );
  237. var res = scheduler.Start(() =>
  238. xs.RepeatWhen(v =>
  239. {
  240. var count = 0;
  241. return v.TakeWhile(w => ++count < 3);
  242. })
  243. );
  244. res.Messages.AssertEqual(
  245. OnNext(205, 1),
  246. OnNext(210, 2),
  247. OnNext(215, 3),
  248. OnNext(225, 1),
  249. OnNext(230, 2),
  250. OnNext(235, 3),
  251. OnNext(245, 1),
  252. OnNext(250, 2),
  253. OnNext(255, 3),
  254. OnCompleted<int>(260)
  255. );
  256. xs.Subscriptions.AssertEqual(
  257. Subscribe(200, 220),
  258. Subscribe(220, 240),
  259. Subscribe(240, 260)
  260. );
  261. }
  262. [TestMethod]
  263. public void RepeatWhen_RepeatCount_Dispose()
  264. {
  265. var scheduler = new TestScheduler();
  266. var xs = scheduler.CreateColdObservable(
  267. OnNext(5, 1),
  268. OnNext(10, 2),
  269. OnNext(15, 3),
  270. OnCompleted<int>(20)
  271. );
  272. var res = scheduler.Start(() =>
  273. xs.RepeatWhen(v =>
  274. {
  275. var count = 0;
  276. return v.TakeWhile(w => ++count < 3);
  277. }), 231
  278. );
  279. res.Messages.AssertEqual(
  280. OnNext(205, 1),
  281. OnNext(210, 2),
  282. OnNext(215, 3),
  283. OnNext(225, 1),
  284. OnNext(230, 2)
  285. );
  286. xs.Subscriptions.AssertEqual(
  287. Subscribe(200, 220),
  288. Subscribe(220, 231)
  289. );
  290. }
  291. [TestMethod]
  292. public void RepeatWhen_RepeatCount_Infinite()
  293. {
  294. var scheduler = new TestScheduler();
  295. var xs = scheduler.CreateColdObservable(
  296. OnNext(100, 1),
  297. OnNext(150, 2),
  298. OnNext(200, 3)
  299. );
  300. var res = scheduler.Start(() =>
  301. xs.RepeatWhen(v =>
  302. {
  303. var count = 0;
  304. return v.TakeWhile(w => ++count < 3);
  305. })
  306. );
  307. res.Messages.AssertEqual(
  308. OnNext(300, 1),
  309. OnNext(350, 2),
  310. OnNext(400, 3)
  311. );
  312. xs.Subscriptions.AssertEqual(
  313. Subscribe(200, 1000)
  314. );
  315. }
  316. [TestMethod]
  317. public void RepeatWhen_RepeatCount_Error()
  318. {
  319. var scheduler = new TestScheduler();
  320. var ex = new Exception();
  321. var xs = scheduler.CreateColdObservable(
  322. OnNext(100, 1),
  323. OnNext(150, 2),
  324. OnNext(200, 3),
  325. OnError<int>(250, ex)
  326. );
  327. var res = scheduler.Start(() =>
  328. xs.RepeatWhen(v =>
  329. {
  330. var count = 0;
  331. return v.TakeWhile(w => ++count < 3);
  332. })
  333. );
  334. res.Messages.AssertEqual(
  335. OnNext(300, 1),
  336. OnNext(350, 2),
  337. OnNext(400, 3),
  338. OnError<int>(450, ex)
  339. );
  340. xs.Subscriptions.AssertEqual(
  341. Subscribe(200, 450)
  342. );
  343. }
  344. [TestMethod]
  345. public void RepeatWhen_RepeatCount_Throws()
  346. {
  347. var scheduler1 = new TestScheduler();
  348. var xs = Observable.Return(1, scheduler1).RepeatWhen(v =>
  349. {
  350. var count = 0;
  351. return v.TakeWhile(w => ++count < 3);
  352. });
  353. xs.Subscribe(x => { throw new InvalidOperationException(); });
  354. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  355. var scheduler2 = new TestScheduler();
  356. var ys = Observable.Throw<int>(new Exception(), scheduler2).RepeatWhen(v =>
  357. {
  358. var count = 0;
  359. return v.TakeWhile(w => ++count < 3);
  360. });
  361. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  362. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  363. var scheduler3 = new TestScheduler();
  364. var zs = Observable.Return(1, scheduler3).RepeatWhen(v =>
  365. {
  366. var count = 0;
  367. return v.TakeWhile(w => ++count < 100);
  368. });
  369. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  370. scheduler3.ScheduleAbsolute(10, () => d.Dispose());
  371. scheduler3.Start();
  372. var xss = Observable.Create(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v =>
  373. {
  374. var count = 0;
  375. return v.TakeWhile(w => ++count < 3);
  376. });
  377. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  378. }
  379. [TestMethod]
  380. public void RepeatWhen_Observable_Repeat_Delayed()
  381. {
  382. var scheduler = new TestScheduler();
  383. var ex = new Exception();
  384. var xs = scheduler.CreateColdObservable(
  385. OnNext(5, 1),
  386. OnNext(10, 2),
  387. OnNext(15, 3),
  388. OnCompleted<int>(20)
  389. );
  390. var res = scheduler.Start(() =>
  391. xs.RepeatWhen(v =>
  392. {
  393. int[] count = [0];
  394. return v.SelectMany(w =>
  395. {
  396. var c = ++count[0];
  397. if (c == 3)
  398. {
  399. return Observable.Throw<int>(ex);
  400. }
  401. return Observable.Return(1).Delay(TimeSpan.FromTicks(c * 100), scheduler);
  402. });
  403. })
  404. );
  405. res.Messages.AssertEqual(
  406. OnNext(205, 1),
  407. OnNext(210, 2),
  408. OnNext(215, 3),
  409. OnNext(325, 1),
  410. OnNext(330, 2),
  411. OnNext(335, 3),
  412. OnNext(545, 1),
  413. OnNext(550, 2),
  414. OnNext(555, 3),
  415. OnError<int>(560, ex)
  416. );
  417. xs.Subscriptions.AssertEqual(
  418. Subscribe(200, 220),
  419. Subscribe(320, 340),
  420. Subscribe(540, 560)
  421. );
  422. }
  423. }
  424. }