1
0

RepeatWhenTest.cs 15 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. using System.Runtime.CompilerServices;
  20. namespace ReactiveTests.Tests
  21. {
  22. public class RepeatWhenTest : ReactiveTest
  23. {
  24. [Fact]
  25. public void RepeatWhen_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RepeatWhen<int, object>(null, v => v));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RepeatWhen<int, object>(Observable.Return(1), null));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.RepeatWhen(v => v).Subscribe(null));
  30. }
  31. [Fact]
  32. public void RepeatWhen_Handler_Crash()
  33. {
  34. var scheduler = new TestScheduler();
  35. var xs = scheduler.CreateColdObservable(
  36. OnCompleted<int>(10)
  37. );
  38. var ex = new InvalidOperationException();
  39. var res = scheduler.Start(() =>
  40. xs.RepeatWhen<int, object>(v => { throw ex; })
  41. );
  42. res.Messages.AssertEqual(
  43. OnError<int>(200, ex)
  44. );
  45. xs.Subscriptions.AssertEqual(
  46. );
  47. }
  48. [Fact]
  49. public void RepeatWhen_Handler_Error()
  50. {
  51. var scheduler = new TestScheduler();
  52. var xs = scheduler.CreateColdObservable(
  53. OnCompleted<int>(10)
  54. );
  55. var ex = new InvalidOperationException();
  56. var res = scheduler.Start(() =>
  57. xs.RepeatWhen<int, object>(v => v.Select<object, object>(w => throw ex))
  58. );
  59. res.Messages.AssertEqual(
  60. OnError<int>(210, ex)
  61. );
  62. xs.Subscriptions.AssertEqual(
  63. Subscribe(200, 210)
  64. );
  65. }
  66. [Fact]
  67. public void RepeatWhen_Handler_Completed()
  68. {
  69. var scheduler = new TestScheduler();
  70. var xs = scheduler.CreateColdObservable(
  71. OnCompleted<int>(10)
  72. );
  73. var ex = new InvalidOperationException();
  74. var res = scheduler.Start(() =>
  75. xs.RepeatWhen<int, object>(v => v.Take(1).Skip(1))
  76. );
  77. res.Messages.AssertEqual(
  78. OnCompleted<int>(210)
  79. );
  80. xs.Subscriptions.AssertEqual(
  81. Subscribe(200, 210)
  82. );
  83. }
  84. [Fact]
  85. public void RepeatWhen_Disposed()
  86. {
  87. var main = new Subject<int>();
  88. var inner = new Subject<int>();
  89. var d = main.RepeatWhen(v => inner).Subscribe();
  90. Assert.True(main.HasObservers);
  91. Assert.True(inner.HasObservers);
  92. d.Dispose();
  93. Assert.False(main.HasObservers);
  94. Assert.False(inner.HasObservers);
  95. }
  96. [Fact]
  97. public void RepeatWhen_Handler_Completed_Disposes_Main()
  98. {
  99. var main = new Subject<int>();
  100. var inner = new Subject<int>();
  101. var end = 0;
  102. var items = 0;
  103. var errors = 0;
  104. main.RepeatWhen(v => inner).Subscribe(
  105. onNext: v => items++,
  106. onError: e => errors++,
  107. onCompleted: () => end++);
  108. Assert.True(main.HasObservers);
  109. Assert.True(inner.HasObservers);
  110. inner.OnCompleted();
  111. Assert.False(main.HasObservers);
  112. Assert.False(inner.HasObservers);
  113. Assert.Equal(0, items);
  114. Assert.Equal(0, errors);
  115. Assert.Equal(1, end);
  116. }
  117. [Fact]
  118. public void RepeatWhen_Handler_Error_Disposes_Main()
  119. {
  120. var main = new Subject<int>();
  121. var inner = new Subject<int>();
  122. var end = 0;
  123. var items = 0;
  124. var errors = 0;
  125. main.RepeatWhen(v => inner).Subscribe(
  126. onNext: v => items++,
  127. onError: e => errors++,
  128. onCompleted: () => end++);
  129. Assert.True(main.HasObservers);
  130. Assert.True(inner.HasObservers);
  131. inner.OnError(new InvalidOperationException());
  132. Assert.False(main.HasObservers);
  133. Assert.False(inner.HasObservers);
  134. Assert.Equal(0, items);
  135. Assert.Equal(1, errors);
  136. Assert.Equal(0, end);
  137. }
  138. [Fact]
  139. public void RepeatWhen_Basic()
  140. {
  141. var scheduler = new TestScheduler();
  142. var xs = scheduler.CreateColdObservable(
  143. OnNext(100, 1),
  144. OnNext(150, 2),
  145. OnNext(200, 3),
  146. OnCompleted<int>(250)
  147. );
  148. var res = scheduler.Start(() =>
  149. xs.RepeatWhen(v => v)
  150. );
  151. res.Messages.AssertEqual(
  152. OnNext(300, 1),
  153. OnNext(350, 2),
  154. OnNext(400, 3),
  155. OnNext(550, 1),
  156. OnNext(600, 2),
  157. OnNext(650, 3),
  158. OnNext(800, 1),
  159. OnNext(850, 2),
  160. OnNext(900, 3)
  161. );
  162. xs.Subscriptions.AssertEqual(
  163. Subscribe(200, 450),
  164. Subscribe(450, 700),
  165. Subscribe(700, 950),
  166. Subscribe(950, 1000)
  167. );
  168. }
  169. [Fact]
  170. public void RepeatWhen_Infinite()
  171. {
  172. var scheduler = new TestScheduler();
  173. var xs = scheduler.CreateColdObservable(
  174. OnNext(100, 1),
  175. OnNext(150, 2),
  176. OnNext(200, 3)
  177. );
  178. var res = scheduler.Start(() =>
  179. xs.RepeatWhen(v => v)
  180. );
  181. res.Messages.AssertEqual(
  182. OnNext(300, 1),
  183. OnNext(350, 2),
  184. OnNext(400, 3)
  185. );
  186. xs.Subscriptions.AssertEqual(
  187. Subscribe(200, 1000)
  188. );
  189. }
  190. [Fact]
  191. public void RepeatWhen_Error()
  192. {
  193. var scheduler = new TestScheduler();
  194. var ex = new Exception();
  195. var xs = scheduler.CreateColdObservable(
  196. OnNext(100, 1),
  197. OnNext(150, 2),
  198. OnNext(200, 3),
  199. OnError<int>(250, ex)
  200. );
  201. var res = scheduler.Start(() =>
  202. xs.RepeatWhen(v => v)
  203. );
  204. res.Messages.AssertEqual(
  205. OnNext(300, 1),
  206. OnNext(350, 2),
  207. OnNext(400, 3),
  208. OnError<int>(450, ex)
  209. );
  210. xs.Subscriptions.AssertEqual(
  211. Subscribe(200, 450)
  212. );
  213. }
  214. [Fact]
  215. public void RepeatWhen_Throws()
  216. {
  217. var scheduler1 = new TestScheduler();
  218. var xs = Observable.Return(1, scheduler1).RepeatWhen(v => v);
  219. xs.Subscribe(x => { throw new InvalidOperationException(); });
  220. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  221. var scheduler2 = new TestScheduler();
  222. var ys = Observable.Throw<int>(new Exception(), scheduler2).RepeatWhen(v => v);
  223. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  224. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  225. var scheduler3 = new TestScheduler();
  226. var zs = Observable.Return(1, scheduler3).RepeatWhen(v => v);
  227. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  228. scheduler3.ScheduleAbsolute(210, () => d.Dispose());
  229. scheduler3.Start();
  230. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v => v);
  231. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  232. }
  233. [Fact]
  234. public void RepeatWhen_RepeatCount_Basic()
  235. {
  236. var scheduler = new TestScheduler();
  237. var xs = scheduler.CreateColdObservable(
  238. OnNext(5, 1),
  239. OnNext(10, 2),
  240. OnNext(15, 3),
  241. OnCompleted<int>(20)
  242. );
  243. var res = scheduler.Start(() =>
  244. xs.RepeatWhen(v =>
  245. {
  246. var count = 0;
  247. return v.TakeWhile(w => ++count < 3);
  248. })
  249. );
  250. res.Messages.AssertEqual(
  251. OnNext(205, 1),
  252. OnNext(210, 2),
  253. OnNext(215, 3),
  254. OnNext(225, 1),
  255. OnNext(230, 2),
  256. OnNext(235, 3),
  257. OnNext(245, 1),
  258. OnNext(250, 2),
  259. OnNext(255, 3),
  260. OnCompleted<int>(260)
  261. );
  262. xs.Subscriptions.AssertEqual(
  263. Subscribe(200, 220),
  264. Subscribe(220, 240),
  265. Subscribe(240, 260)
  266. );
  267. }
  268. [Fact]
  269. public void RepeatWhen_RepeatCount_Dispose()
  270. {
  271. var scheduler = new TestScheduler();
  272. var xs = scheduler.CreateColdObservable(
  273. OnNext(5, 1),
  274. OnNext(10, 2),
  275. OnNext(15, 3),
  276. OnCompleted<int>(20)
  277. );
  278. var res = scheduler.Start(() =>
  279. xs.RepeatWhen(v =>
  280. {
  281. var count = 0;
  282. return v.TakeWhile(w => ++count < 3);
  283. }), 231
  284. );
  285. res.Messages.AssertEqual(
  286. OnNext(205, 1),
  287. OnNext(210, 2),
  288. OnNext(215, 3),
  289. OnNext(225, 1),
  290. OnNext(230, 2)
  291. );
  292. xs.Subscriptions.AssertEqual(
  293. Subscribe(200, 220),
  294. Subscribe(220, 231)
  295. );
  296. }
  297. [Fact]
  298. public void RepeatWhen_RepeatCount_Infinite()
  299. {
  300. var scheduler = new TestScheduler();
  301. var xs = scheduler.CreateColdObservable(
  302. OnNext(100, 1),
  303. OnNext(150, 2),
  304. OnNext(200, 3)
  305. );
  306. var res = scheduler.Start(() =>
  307. xs.RepeatWhen(v =>
  308. {
  309. var count = 0;
  310. return v.TakeWhile(w => ++count < 3);
  311. })
  312. );
  313. res.Messages.AssertEqual(
  314. OnNext(300, 1),
  315. OnNext(350, 2),
  316. OnNext(400, 3)
  317. );
  318. xs.Subscriptions.AssertEqual(
  319. Subscribe(200, 1000)
  320. );
  321. }
  322. [Fact]
  323. public void RepeatWhen_RepeatCount_Error()
  324. {
  325. var scheduler = new TestScheduler();
  326. var ex = new Exception();
  327. var xs = scheduler.CreateColdObservable(
  328. OnNext(100, 1),
  329. OnNext(150, 2),
  330. OnNext(200, 3),
  331. OnError<int>(250, ex)
  332. );
  333. var res = scheduler.Start(() =>
  334. xs.RepeatWhen(v =>
  335. {
  336. var count = 0;
  337. return v.TakeWhile(w => ++count < 3);
  338. })
  339. );
  340. res.Messages.AssertEqual(
  341. OnNext(300, 1),
  342. OnNext(350, 2),
  343. OnNext(400, 3),
  344. OnError<int>(450, ex)
  345. );
  346. xs.Subscriptions.AssertEqual(
  347. Subscribe(200, 450)
  348. );
  349. }
  350. [Fact]
  351. public void RepeatWhen_RepeatCount_Throws()
  352. {
  353. var scheduler1 = new TestScheduler();
  354. var xs = Observable.Return(1, scheduler1).RepeatWhen(v =>
  355. {
  356. var count = 0;
  357. return v.TakeWhile(w => ++count < 3);
  358. });
  359. xs.Subscribe(x => { throw new InvalidOperationException(); });
  360. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  361. var scheduler2 = new TestScheduler();
  362. var ys = Observable.Throw<int>(new Exception(), scheduler2).RepeatWhen(v =>
  363. {
  364. var count = 0;
  365. return v.TakeWhile(w => ++count < 3);
  366. });
  367. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  368. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  369. var scheduler3 = new TestScheduler();
  370. var zs = Observable.Return(1, scheduler3).RepeatWhen(v =>
  371. {
  372. var count = 0;
  373. return v.TakeWhile(w => ++count < 100);
  374. });
  375. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  376. scheduler3.ScheduleAbsolute(10, () => d.Dispose());
  377. scheduler3.Start();
  378. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v =>
  379. {
  380. var count = 0;
  381. return v.TakeWhile(w => ++count < 3);
  382. });
  383. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  384. }
  385. [Fact]
  386. public void RepeatWhen_Observable_Repeat_Delayed()
  387. {
  388. var scheduler = new TestScheduler();
  389. var ex = new Exception();
  390. var xs = scheduler.CreateColdObservable(
  391. OnNext(5, 1),
  392. OnNext(10, 2),
  393. OnNext(15, 3),
  394. OnCompleted<int>(20)
  395. );
  396. var res = scheduler.Start(() =>
  397. xs.RepeatWhen(v =>
  398. {
  399. int[] count = { 0 };
  400. return v.SelectMany(w => {
  401. int c = ++count[0];
  402. if (c == 3)
  403. {
  404. return Observable.Throw<int>(ex);
  405. }
  406. return Observable.Return(1).Delay(TimeSpan.FromTicks(c * 100), scheduler);
  407. });
  408. })
  409. );
  410. res.Messages.AssertEqual(
  411. OnNext(205, 1),
  412. OnNext(210, 2),
  413. OnNext(215, 3),
  414. OnNext(325, 1),
  415. OnNext(330, 2),
  416. OnNext(335, 3),
  417. OnNext(545, 1),
  418. OnNext(550, 2),
  419. OnNext(555, 3),
  420. OnError<int>(560, ex)
  421. );
  422. xs.Subscriptions.AssertEqual(
  423. Subscribe(200, 220),
  424. Subscribe(320, 340),
  425. Subscribe(540, 560)
  426. );
  427. }
  428. }
  429. }