RepeatWhenTest.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Xunit;
  12. namespace ReactiveTests.Tests
  13. {
  14. public class RepeatWhenTest : ReactiveTest
  15. {
  16. [Fact]
  17. public void RepeatWhen_ArgumentChecking()
  18. {
  19. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RepeatWhen<int, object>(null, v => v));
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RepeatWhen<int, object>(Observable.Return(1), null));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.RepeatWhen(v => v).Subscribe(null));
  22. }
  23. [Fact]
  24. public void RepeatWhen_Handler_Crash()
  25. {
  26. var scheduler = new TestScheduler();
  27. var xs = scheduler.CreateColdObservable(
  28. OnCompleted<int>(10)
  29. );
  30. var ex = new InvalidOperationException();
  31. var res = scheduler.Start(() =>
  32. xs.RepeatWhen<int, object>(v => { throw ex; })
  33. );
  34. res.Messages.AssertEqual(
  35. OnError<int>(200, ex)
  36. );
  37. xs.Subscriptions.AssertEqual(
  38. );
  39. }
  40. [Fact]
  41. public void RepeatWhen_Handler_Error()
  42. {
  43. var scheduler = new TestScheduler();
  44. var xs = scheduler.CreateColdObservable(
  45. OnCompleted<int>(10)
  46. );
  47. var ex = new InvalidOperationException();
  48. var res = scheduler.Start(() =>
  49. xs.RepeatWhen<int, object>(v => v.Select<object, object>(w => throw ex))
  50. );
  51. res.Messages.AssertEqual(
  52. OnError<int>(210, ex)
  53. );
  54. xs.Subscriptions.AssertEqual(
  55. Subscribe(200, 210)
  56. );
  57. }
  58. [Fact]
  59. public void RepeatWhen_Handler_Completed()
  60. {
  61. var scheduler = new TestScheduler();
  62. var xs = scheduler.CreateColdObservable(
  63. OnCompleted<int>(10)
  64. );
  65. var ex = new InvalidOperationException();
  66. var res = scheduler.Start(() =>
  67. xs.RepeatWhen<int, object>(v => v.Take(1).Skip(1))
  68. );
  69. res.Messages.AssertEqual(
  70. OnCompleted<int>(210)
  71. );
  72. xs.Subscriptions.AssertEqual(
  73. Subscribe(200, 210)
  74. );
  75. }
  76. [Fact]
  77. public void RepeatWhen_Disposed()
  78. {
  79. var main = new Subject<int>();
  80. var inner = new Subject<int>();
  81. var d = main.RepeatWhen(v => inner).Subscribe();
  82. Assert.True(main.HasObservers);
  83. Assert.True(inner.HasObservers);
  84. d.Dispose();
  85. Assert.False(main.HasObservers);
  86. Assert.False(inner.HasObservers);
  87. }
  88. [Fact]
  89. public void RepeatWhen_Handler_Completed_Disposes_Main()
  90. {
  91. var main = new Subject<int>();
  92. var inner = new Subject<int>();
  93. var end = 0;
  94. var items = 0;
  95. var errors = 0;
  96. main.RepeatWhen(v => inner).Subscribe(
  97. onNext: v => items++,
  98. onError: e => errors++,
  99. onCompleted: () => end++);
  100. Assert.True(main.HasObservers);
  101. Assert.True(inner.HasObservers);
  102. inner.OnCompleted();
  103. Assert.False(main.HasObservers);
  104. Assert.False(inner.HasObservers);
  105. Assert.Equal(0, items);
  106. Assert.Equal(0, errors);
  107. Assert.Equal(1, end);
  108. }
  109. [Fact]
  110. public void RepeatWhen_Handler_Error_Disposes_Main()
  111. {
  112. var main = new Subject<int>();
  113. var inner = new Subject<int>();
  114. var end = 0;
  115. var items = 0;
  116. var errors = 0;
  117. main.RepeatWhen(v => inner).Subscribe(
  118. onNext: v => items++,
  119. onError: e => errors++,
  120. onCompleted: () => end++);
  121. Assert.True(main.HasObservers);
  122. Assert.True(inner.HasObservers);
  123. inner.OnError(new InvalidOperationException());
  124. Assert.False(main.HasObservers);
  125. Assert.False(inner.HasObservers);
  126. Assert.Equal(0, items);
  127. Assert.Equal(1, errors);
  128. Assert.Equal(0, end);
  129. }
  130. [Fact]
  131. public void RepeatWhen_Basic()
  132. {
  133. var scheduler = new TestScheduler();
  134. var xs = scheduler.CreateColdObservable(
  135. OnNext(100, 1),
  136. OnNext(150, 2),
  137. OnNext(200, 3),
  138. OnCompleted<int>(250)
  139. );
  140. var res = scheduler.Start(() =>
  141. xs.RepeatWhen(v => v)
  142. );
  143. res.Messages.AssertEqual(
  144. OnNext(300, 1),
  145. OnNext(350, 2),
  146. OnNext(400, 3),
  147. OnNext(550, 1),
  148. OnNext(600, 2),
  149. OnNext(650, 3),
  150. OnNext(800, 1),
  151. OnNext(850, 2),
  152. OnNext(900, 3)
  153. );
  154. xs.Subscriptions.AssertEqual(
  155. Subscribe(200, 450),
  156. Subscribe(450, 700),
  157. Subscribe(700, 950),
  158. Subscribe(950, 1000)
  159. );
  160. }
  161. [Fact]
  162. public void RepeatWhen_Infinite()
  163. {
  164. var scheduler = new TestScheduler();
  165. var xs = scheduler.CreateColdObservable(
  166. OnNext(100, 1),
  167. OnNext(150, 2),
  168. OnNext(200, 3)
  169. );
  170. var res = scheduler.Start(() =>
  171. xs.RepeatWhen(v => v)
  172. );
  173. res.Messages.AssertEqual(
  174. OnNext(300, 1),
  175. OnNext(350, 2),
  176. OnNext(400, 3)
  177. );
  178. xs.Subscriptions.AssertEqual(
  179. Subscribe(200, 1000)
  180. );
  181. }
  182. [Fact]
  183. public void RepeatWhen_Error()
  184. {
  185. var scheduler = new TestScheduler();
  186. var ex = new Exception();
  187. var xs = scheduler.CreateColdObservable(
  188. OnNext(100, 1),
  189. OnNext(150, 2),
  190. OnNext(200, 3),
  191. OnError<int>(250, ex)
  192. );
  193. var res = scheduler.Start(() =>
  194. xs.RepeatWhen(v => v)
  195. );
  196. res.Messages.AssertEqual(
  197. OnNext(300, 1),
  198. OnNext(350, 2),
  199. OnNext(400, 3),
  200. OnError<int>(450, ex)
  201. );
  202. xs.Subscriptions.AssertEqual(
  203. Subscribe(200, 450)
  204. );
  205. }
  206. [Fact]
  207. public void RepeatWhen_Throws()
  208. {
  209. var scheduler1 = new TestScheduler();
  210. var xs = Observable.Return(1, scheduler1).RepeatWhen(v => v);
  211. xs.Subscribe(x => { throw new InvalidOperationException(); });
  212. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  213. var scheduler2 = new TestScheduler();
  214. var ys = Observable.Throw<int>(new Exception(), scheduler2).RepeatWhen(v => v);
  215. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  216. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  217. var scheduler3 = new TestScheduler();
  218. var zs = Observable.Return(1, scheduler3).RepeatWhen(v => v);
  219. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  220. scheduler3.ScheduleAbsolute(210, () => d.Dispose());
  221. scheduler3.Start();
  222. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v => v);
  223. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  224. }
  225. [Fact]
  226. public void RepeatWhen_RepeatCount_Basic()
  227. {
  228. var scheduler = new TestScheduler();
  229. var xs = scheduler.CreateColdObservable(
  230. OnNext(5, 1),
  231. OnNext(10, 2),
  232. OnNext(15, 3),
  233. OnCompleted<int>(20)
  234. );
  235. var res = scheduler.Start(() =>
  236. xs.RepeatWhen(v =>
  237. {
  238. var count = 0;
  239. return v.TakeWhile(w => ++count < 3);
  240. })
  241. );
  242. res.Messages.AssertEqual(
  243. OnNext(205, 1),
  244. OnNext(210, 2),
  245. OnNext(215, 3),
  246. OnNext(225, 1),
  247. OnNext(230, 2),
  248. OnNext(235, 3),
  249. OnNext(245, 1),
  250. OnNext(250, 2),
  251. OnNext(255, 3),
  252. OnCompleted<int>(260)
  253. );
  254. xs.Subscriptions.AssertEqual(
  255. Subscribe(200, 220),
  256. Subscribe(220, 240),
  257. Subscribe(240, 260)
  258. );
  259. }
  260. [Fact]
  261. public void RepeatWhen_RepeatCount_Dispose()
  262. {
  263. var scheduler = new TestScheduler();
  264. var xs = scheduler.CreateColdObservable(
  265. OnNext(5, 1),
  266. OnNext(10, 2),
  267. OnNext(15, 3),
  268. OnCompleted<int>(20)
  269. );
  270. var res = scheduler.Start(() =>
  271. xs.RepeatWhen(v =>
  272. {
  273. var count = 0;
  274. return v.TakeWhile(w => ++count < 3);
  275. }), 231
  276. );
  277. res.Messages.AssertEqual(
  278. OnNext(205, 1),
  279. OnNext(210, 2),
  280. OnNext(215, 3),
  281. OnNext(225, 1),
  282. OnNext(230, 2)
  283. );
  284. xs.Subscriptions.AssertEqual(
  285. Subscribe(200, 220),
  286. Subscribe(220, 231)
  287. );
  288. }
  289. [Fact]
  290. public void RepeatWhen_RepeatCount_Infinite()
  291. {
  292. var scheduler = new TestScheduler();
  293. var xs = scheduler.CreateColdObservable(
  294. OnNext(100, 1),
  295. OnNext(150, 2),
  296. OnNext(200, 3)
  297. );
  298. var res = scheduler.Start(() =>
  299. xs.RepeatWhen(v =>
  300. {
  301. var count = 0;
  302. return v.TakeWhile(w => ++count < 3);
  303. })
  304. );
  305. res.Messages.AssertEqual(
  306. OnNext(300, 1),
  307. OnNext(350, 2),
  308. OnNext(400, 3)
  309. );
  310. xs.Subscriptions.AssertEqual(
  311. Subscribe(200, 1000)
  312. );
  313. }
  314. [Fact]
  315. public void RepeatWhen_RepeatCount_Error()
  316. {
  317. var scheduler = new TestScheduler();
  318. var ex = new Exception();
  319. var xs = scheduler.CreateColdObservable(
  320. OnNext(100, 1),
  321. OnNext(150, 2),
  322. OnNext(200, 3),
  323. OnError<int>(250, ex)
  324. );
  325. var res = scheduler.Start(() =>
  326. xs.RepeatWhen(v =>
  327. {
  328. var count = 0;
  329. return v.TakeWhile(w => ++count < 3);
  330. })
  331. );
  332. res.Messages.AssertEqual(
  333. OnNext(300, 1),
  334. OnNext(350, 2),
  335. OnNext(400, 3),
  336. OnError<int>(450, ex)
  337. );
  338. xs.Subscriptions.AssertEqual(
  339. Subscribe(200, 450)
  340. );
  341. }
  342. [Fact]
  343. public void RepeatWhen_RepeatCount_Throws()
  344. {
  345. var scheduler1 = new TestScheduler();
  346. var xs = Observable.Return(1, scheduler1).RepeatWhen(v =>
  347. {
  348. var count = 0;
  349. return v.TakeWhile(w => ++count < 3);
  350. });
  351. xs.Subscribe(x => { throw new InvalidOperationException(); });
  352. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  353. var scheduler2 = new TestScheduler();
  354. var ys = Observable.Throw<int>(new Exception(), scheduler2).RepeatWhen(v =>
  355. {
  356. var count = 0;
  357. return v.TakeWhile(w => ++count < 3);
  358. });
  359. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  360. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  361. var scheduler3 = new TestScheduler();
  362. var zs = Observable.Return(1, scheduler3).RepeatWhen(v =>
  363. {
  364. var count = 0;
  365. return v.TakeWhile(w => ++count < 100);
  366. });
  367. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  368. scheduler3.ScheduleAbsolute(10, () => d.Dispose());
  369. scheduler3.Start();
  370. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v =>
  371. {
  372. var count = 0;
  373. return v.TakeWhile(w => ++count < 3);
  374. });
  375. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  376. }
  377. [Fact]
  378. public void RepeatWhen_Observable_Repeat_Delayed()
  379. {
  380. var scheduler = new TestScheduler();
  381. var ex = new Exception();
  382. var xs = scheduler.CreateColdObservable(
  383. OnNext(5, 1),
  384. OnNext(10, 2),
  385. OnNext(15, 3),
  386. OnCompleted<int>(20)
  387. );
  388. var res = scheduler.Start(() =>
  389. xs.RepeatWhen(v =>
  390. {
  391. int[] count = { 0 };
  392. return v.SelectMany(w =>
  393. {
  394. var c = ++count[0];
  395. if (c == 3)
  396. {
  397. return Observable.Throw<int>(ex);
  398. }
  399. return Observable.Return(1).Delay(TimeSpan.FromTicks(c * 100), scheduler);
  400. });
  401. })
  402. );
  403. res.Messages.AssertEqual(
  404. OnNext(205, 1),
  405. OnNext(210, 2),
  406. OnNext(215, 3),
  407. OnNext(325, 1),
  408. OnNext(330, 2),
  409. OnNext(335, 3),
  410. OnNext(545, 1),
  411. OnNext(550, 2),
  412. OnNext(555, 3),
  413. OnError<int>(560, ex)
  414. );
  415. xs.Subscriptions.AssertEqual(
  416. Subscribe(200, 220),
  417. Subscribe(320, 340),
  418. Subscribe(540, 560)
  419. );
  420. }
  421. }
  422. }