OnErrorResumeNextTest.cs 20 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Linq;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using ReactiveTests.Dummies;
  12. using Xunit;
  13. namespace ReactiveTests.Tests
  14. {
  15. public class OnErrorResumeNextTest : ReactiveTest
  16. {
  17. [Fact]
  18. public void OnErrorResumeNext_ArgumentChecking()
  19. {
  20. var xs = DummyObservable<int>.Instance;
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext<int>(null));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext((IEnumerable<IObservable<int>>)null));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext(null, xs));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext(xs, null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext(null, xs));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext(xs, null));
  27. }
  28. [Fact]
  29. public void OnErrorResumeNext_IEofIO_GetEnumeratorThrows()
  30. {
  31. var ex = new Exception();
  32. var scheduler = new TestScheduler();
  33. var xss = new RogueEnumerable<IObservable<int>>(ex);
  34. var res = scheduler.Start(() =>
  35. Observable.OnErrorResumeNext(xss)
  36. );
  37. res.Messages.AssertEqual(
  38. OnError<int>(200, ex)
  39. );
  40. }
  41. [Fact]
  42. public void OnErrorResumeNext_IEofIO()
  43. {
  44. var scheduler = new TestScheduler();
  45. var xs1 = scheduler.CreateColdObservable(
  46. OnNext(10, 1),
  47. OnNext(20, 2),
  48. OnNext(30, 3),
  49. OnCompleted<int>(40)
  50. );
  51. var xs2 = scheduler.CreateColdObservable(
  52. OnNext(10, 4),
  53. OnNext(20, 5),
  54. OnError<int>(30, new Exception())
  55. );
  56. var xs3 = scheduler.CreateColdObservable(
  57. OnNext(10, 6),
  58. OnNext(20, 7),
  59. OnNext(30, 8),
  60. OnNext(40, 9),
  61. OnCompleted<int>(50)
  62. );
  63. var res = scheduler.Start(() =>
  64. Observable.OnErrorResumeNext(new[] { xs1, xs2, xs3 })
  65. );
  66. res.Messages.AssertEqual(
  67. OnNext(210, 1),
  68. OnNext(220, 2),
  69. OnNext(230, 3),
  70. OnNext(250, 4),
  71. OnNext(260, 5),
  72. OnNext(280, 6),
  73. OnNext(290, 7),
  74. OnNext(300, 8),
  75. OnNext(310, 9),
  76. OnCompleted<int>(320)
  77. );
  78. xs1.Subscriptions.AssertEqual(
  79. Subscribe(200, 240)
  80. );
  81. xs2.Subscriptions.AssertEqual(
  82. Subscribe(240, 270)
  83. );
  84. xs3.Subscriptions.AssertEqual(
  85. Subscribe(270, 320)
  86. );
  87. }
  88. [Fact]
  89. public void OnErrorResumeNext_NoErrors()
  90. {
  91. var scheduler = new TestScheduler();
  92. var o1 = scheduler.CreateHotObservable(
  93. OnNext(150, 1),
  94. OnNext(210, 2),
  95. OnNext(220, 3),
  96. OnCompleted<int>(230)
  97. );
  98. var o2 = scheduler.CreateHotObservable(
  99. OnNext(240, 4),
  100. OnCompleted<int>(250)
  101. );
  102. var res = scheduler.Start(() =>
  103. o1.OnErrorResumeNext(o2)
  104. );
  105. res.Messages.AssertEqual(
  106. OnNext(210, 2),
  107. OnNext(220, 3),
  108. OnNext(240, 4),
  109. OnCompleted<int>(250)
  110. );
  111. o1.Subscriptions.AssertEqual(
  112. Subscribe(200, 230)
  113. );
  114. o2.Subscriptions.AssertEqual(
  115. Subscribe(230, 250)
  116. );
  117. }
  118. [Fact]
  119. public void OnErrorResumeNext_Error()
  120. {
  121. var scheduler = new TestScheduler();
  122. var o1 = scheduler.CreateHotObservable(
  123. OnNext(150, 1),
  124. OnNext(210, 2),
  125. OnNext(220, 3),
  126. OnError<int>(230, new Exception())
  127. );
  128. var o2 = scheduler.CreateHotObservable(
  129. OnNext(240, 4),
  130. OnCompleted<int>(250)
  131. );
  132. var res = scheduler.Start(() =>
  133. o1.OnErrorResumeNext(o2)
  134. );
  135. res.Messages.AssertEqual(
  136. OnNext(210, 2),
  137. OnNext(220, 3),
  138. OnNext(240, 4),
  139. OnCompleted<int>(250)
  140. );
  141. o1.Subscriptions.AssertEqual(
  142. Subscribe(200, 230)
  143. );
  144. o2.Subscriptions.AssertEqual(
  145. Subscribe(230, 250)
  146. );
  147. }
  148. [Fact]
  149. public void OnErrorResumeNext_ErrorMultiple()
  150. {
  151. var scheduler = new TestScheduler();
  152. var o1 = scheduler.CreateHotObservable(
  153. OnNext(150, 1),
  154. OnNext(210, 2),
  155. OnError<int>(220, new Exception())
  156. );
  157. var o2 = scheduler.CreateHotObservable(
  158. OnNext(230, 3),
  159. OnError<int>(240, new Exception())
  160. );
  161. var o3 = scheduler.CreateHotObservable(
  162. OnCompleted<int>(250)
  163. );
  164. var res = scheduler.Start(() =>
  165. Observable.OnErrorResumeNext(o1, o2, o3)
  166. );
  167. res.Messages.AssertEqual(
  168. OnNext(210, 2),
  169. OnNext(230, 3),
  170. OnCompleted<int>(250)
  171. );
  172. o1.Subscriptions.AssertEqual(
  173. Subscribe(200, 220)
  174. );
  175. o2.Subscriptions.AssertEqual(
  176. Subscribe(220, 240)
  177. );
  178. o3.Subscriptions.AssertEqual(
  179. Subscribe(240, 250)
  180. );
  181. }
  182. [Fact]
  183. public void OnErrorResumeNext_EmptyReturnThrowAndMore()
  184. {
  185. var scheduler = new TestScheduler();
  186. var o1 = scheduler.CreateHotObservable(
  187. OnNext(150, 1),
  188. OnCompleted<int>(205)
  189. );
  190. var o2 = scheduler.CreateHotObservable(
  191. OnNext(215, 2),
  192. OnCompleted<int>(220)
  193. );
  194. var o3 = scheduler.CreateHotObservable(
  195. OnNext(225, 3),
  196. OnNext(230, 4),
  197. OnCompleted<int>(235)
  198. );
  199. var o4 = scheduler.CreateHotObservable(
  200. OnError<int>(240, new Exception())
  201. );
  202. var o5 = scheduler.CreateHotObservable(
  203. OnNext(245, 5),
  204. OnCompleted<int>(250)
  205. );
  206. var res = scheduler.Start(() =>
  207. new[] { o1, o2, o3, o4, o5 }.OnErrorResumeNext()
  208. );
  209. res.Messages.AssertEqual(
  210. OnNext(215, 2),
  211. OnNext(225, 3),
  212. OnNext(230, 4),
  213. OnNext(245, 5),
  214. OnCompleted<int>(250)
  215. );
  216. o1.Subscriptions.AssertEqual(
  217. Subscribe(200, 205)
  218. );
  219. o2.Subscriptions.AssertEqual(
  220. Subscribe(205, 220)
  221. );
  222. o3.Subscriptions.AssertEqual(
  223. Subscribe(220, 235)
  224. );
  225. o4.Subscriptions.AssertEqual(
  226. Subscribe(235, 240)
  227. );
  228. o5.Subscriptions.AssertEqual(
  229. Subscribe(240, 250)
  230. );
  231. }
  232. [Fact]
  233. public void OnErrorResumeNext_LastIsntSpecial()
  234. {
  235. var scheduler = new TestScheduler();
  236. var ex = new Exception();
  237. var o1 = scheduler.CreateHotObservable(
  238. OnNext(150, 1),
  239. OnNext(210, 2),
  240. OnCompleted<int>(220)
  241. );
  242. var o2 = scheduler.CreateHotObservable(
  243. OnError<int>(230, ex)
  244. );
  245. var res = scheduler.Start(() =>
  246. o1.OnErrorResumeNext(o2)
  247. );
  248. res.Messages.AssertEqual(
  249. OnNext(210, 2),
  250. OnCompleted<int>(230)
  251. );
  252. o1.Subscriptions.AssertEqual(
  253. Subscribe(200, 220)
  254. );
  255. o2.Subscriptions.AssertEqual(
  256. Subscribe(220, 230)
  257. );
  258. }
  259. [Fact]
  260. public void OnErrorResumeNext_SingleSourceDoesntThrow()
  261. {
  262. var scheduler = new TestScheduler();
  263. var ex = new Exception();
  264. var o1 = scheduler.CreateHotObservable(
  265. OnError<int>(230, ex)
  266. );
  267. var res = scheduler.Start(() =>
  268. Observable.OnErrorResumeNext(o1)
  269. );
  270. res.Messages.AssertEqual(
  271. OnCompleted<int>(230)
  272. );
  273. o1.Subscriptions.AssertEqual(
  274. Subscribe(200, 230)
  275. );
  276. }
  277. [Fact]
  278. public void OnErrorResumeNext_EndWithNever()
  279. {
  280. var scheduler = new TestScheduler();
  281. var ex = new Exception();
  282. var o1 = scheduler.CreateHotObservable(
  283. OnNext(150, 1),
  284. OnNext(210, 2),
  285. OnCompleted<int>(220)
  286. );
  287. var o2 = scheduler.CreateHotObservable(
  288. OnNext(150, 1)
  289. );
  290. var res = scheduler.Start(() =>
  291. Observable.OnErrorResumeNext(o1, o2)
  292. );
  293. res.Messages.AssertEqual(
  294. OnNext(210, 2)
  295. );
  296. o1.Subscriptions.AssertEqual(
  297. Subscribe(200, 220)
  298. );
  299. o2.Subscriptions.AssertEqual(
  300. Subscribe(220, 1000)
  301. );
  302. }
  303. [Fact]
  304. public void OnErrorResumeNext_StartWithNever()
  305. {
  306. var scheduler = new TestScheduler();
  307. var ex = new Exception();
  308. var o1 = scheduler.CreateHotObservable(
  309. OnNext(150, 1)
  310. );
  311. var o2 = scheduler.CreateHotObservable(
  312. OnNext(150, 1),
  313. OnNext(210, 2),
  314. OnCompleted<int>(220)
  315. );
  316. var res = scheduler.Start(() =>
  317. Observable.OnErrorResumeNext(o1, o2)
  318. );
  319. res.Messages.AssertEqual(
  320. );
  321. o1.Subscriptions.AssertEqual(
  322. Subscribe(200, 1000)
  323. );
  324. o2.Subscriptions.AssertEqual(
  325. );
  326. }
  327. [Fact]
  328. public void OnErrorResumeNext_DefaultScheduler_Binary()
  329. {
  330. var evt = new ManualResetEvent(false);
  331. var sum = 0;
  332. Observable.Return(1).OnErrorResumeNext(Observable.Return(2)).Subscribe(x =>
  333. {
  334. sum += x;
  335. }, () => evt.Set());
  336. evt.WaitOne();
  337. Assert.Equal(3, sum);
  338. }
  339. [Fact]
  340. public void OnErrorResumeNext_DefaultScheduler_Nary()
  341. {
  342. var evt = new ManualResetEvent(false);
  343. var sum = 0;
  344. Observable.OnErrorResumeNext(Observable.Return(1), Observable.Return(2), Observable.Return(3)).Subscribe(x =>
  345. {
  346. sum += x;
  347. }, () => evt.Set());
  348. evt.WaitOne();
  349. Assert.Equal(6, sum);
  350. }
  351. [Fact]
  352. public void OnErrorResumeNext_DefaultScheduler_NaryEnumerable()
  353. {
  354. var evt = new ManualResetEvent(false);
  355. IEnumerable<IObservable<int>> sources = new[] { Observable.Return(1), Observable.Return(2), Observable.Return(3) };
  356. var sum = 0;
  357. Observable.OnErrorResumeNext(sources).Subscribe(x =>
  358. {
  359. sum += x;
  360. }, () => evt.Set());
  361. evt.WaitOne();
  362. Assert.Equal(6, sum);
  363. }
  364. [Fact]
  365. public void OnErrorResumeNext_IteratorThrows()
  366. {
  367. var scheduler = new TestScheduler();
  368. var ex = new Exception();
  369. var res = scheduler.Start(() =>
  370. Observable.OnErrorResumeNext(Catch_IteratorThrows_Source(ex, true))
  371. );
  372. res.Messages.AssertEqual(
  373. OnError<int>(200, ex)
  374. );
  375. }
  376. [Fact]
  377. public void OnErrorResumeNext_EnumerableThrows()
  378. {
  379. var scheduler = new TestScheduler();
  380. var o = scheduler.CreateHotObservable(
  381. OnNext(150, 1),
  382. OnNext(210, 2),
  383. OnNext(220, 3),
  384. OnError<int>(225, new Exception())
  385. );
  386. var ex = new Exception();
  387. var xss = new MockEnumerable<IObservable<int>>(scheduler, GetObservablesForOnErrorResumeNextThrow(o, ex));
  388. var res = scheduler.Start(() =>
  389. xss.OnErrorResumeNext()
  390. );
  391. res.Messages.AssertEqual(
  392. OnNext(210, 2),
  393. OnNext(220, 3),
  394. OnError<int>(225, ex)
  395. );
  396. o.Subscriptions.AssertEqual(
  397. Subscribe(200, 225)
  398. );
  399. xss.Subscriptions.AssertEqual(
  400. Subscribe(200, 225)
  401. );
  402. }
  403. private IEnumerable<IObservable<int>> GetObservablesForOnErrorResumeNextThrow(IObservable<int> first, Exception ex)
  404. {
  405. yield return first;
  406. throw ex;
  407. }
  408. [Fact]
  409. public void OnErrorResumeNext_EnumerableTiming()
  410. {
  411. var scheduler = new TestScheduler();
  412. var o1 = scheduler.CreateHotObservable(
  413. OnNext(150, 1),
  414. OnNext(210, 2), // !
  415. OnNext(220, 3), // !
  416. OnCompleted<int>(230)
  417. );
  418. var o2 = scheduler.CreateColdObservable(
  419. OnNext(50, 4), // !
  420. OnNext(60, 5), // !
  421. OnNext(70, 6), // !
  422. OnError<int>(80, new Exception())
  423. );
  424. var o3 = scheduler.CreateHotObservable(
  425. OnNext(150, 1),
  426. OnNext(200, 2),
  427. OnNext(210, 3),
  428. OnNext(220, 4),
  429. OnNext(230, 5),
  430. OnNext(270, 6),
  431. OnNext(320, 7), // !
  432. OnNext(330, 8), // !
  433. OnCompleted<int>(340)
  434. );
  435. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, new[] { o1, o2, o3, o2 });
  436. var res = scheduler.Start(() =>
  437. xss.Select(xs => (IObservable<int>)xs).OnErrorResumeNext()
  438. );
  439. res.Messages.AssertEqual(
  440. OnNext(210, 2),
  441. OnNext(220, 3),
  442. OnNext(280, 4),
  443. OnNext(290, 5),
  444. OnNext(300, 6),
  445. OnNext(320, 7),
  446. OnNext(330, 8),
  447. OnNext(390, 4),
  448. OnNext(400, 5),
  449. OnNext(410, 6),
  450. OnCompleted<int>(420)
  451. );
  452. o1.Subscriptions.AssertEqual(
  453. Subscribe(200, 230)
  454. );
  455. o2.Subscriptions.AssertEqual(
  456. Subscribe(230, 310),
  457. Subscribe(340, 420)
  458. );
  459. o3.Subscriptions.AssertEqual(
  460. Subscribe(310, 340)
  461. );
  462. xss.Subscriptions.AssertEqual(
  463. Subscribe(200, 420)
  464. );
  465. }
  466. [Fact]
  467. public void OnErrorResumeNext_Enumerable_Dispose()
  468. {
  469. var scheduler = new TestScheduler();
  470. var o1 = scheduler.CreateHotObservable(
  471. OnNext(150, 1),
  472. OnNext(210, 2),
  473. OnNext(220, 3),
  474. OnError<int>(230, new Exception())
  475. );
  476. var o2 = scheduler.CreateHotObservable(
  477. OnNext(150, 1),
  478. OnNext(200, 2),
  479. OnNext(210, 3),
  480. OnNext(240, 4),
  481. OnNext(270, 5),
  482. OnNext(320, 6),
  483. OnNext(330, 7),
  484. OnCompleted<int>(340)
  485. );
  486. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, new[] { o1, o2 });
  487. var res = scheduler.Start(() =>
  488. xss.Select(xs => (IObservable<int>)xs).OnErrorResumeNext(),
  489. 300
  490. );
  491. res.Messages.AssertEqual(
  492. OnNext(210, 2),
  493. OnNext(220, 3),
  494. OnNext(240, 4),
  495. OnNext(270, 5)
  496. );
  497. o1.Subscriptions.AssertEqual(
  498. Subscribe(200, 230)
  499. );
  500. o2.Subscriptions.AssertEqual(
  501. Subscribe(230, 300)
  502. );
  503. xss.Subscriptions.AssertEqual(
  504. Subscribe(200, 300)
  505. );
  506. }
  507. #if !NO_PERF
  508. [Fact]
  509. public void OnErrorResumeNext_TailRecursive1()
  510. {
  511. var create = 0L;
  512. var start = 200L;
  513. var end = 1000L;
  514. var scheduler = new TestScheduler();
  515. var o = scheduler.CreateColdObservable(
  516. OnNext(10, 1),
  517. OnNext(20, 2),
  518. OnNext(30, 3),
  519. OnError<int>(40, new Exception())
  520. );
  521. IObservable<int> f() => Observable.Defer(() => o.OnErrorResumeNext(f()));
  522. var res = scheduler.Start(() => f(), create, start, end);
  523. var expected = new List<Recorded<Notification<int>>>();
  524. var t = start;
  525. while (t <= end)
  526. {
  527. var n = (t - start) / 10;
  528. if (n % 4 != 0)
  529. {
  530. expected.Add(OnNext(t, (int)(n % 4)));
  531. }
  532. t += 10;
  533. }
  534. res.Messages.AssertEqual(expected);
  535. }
  536. #if HAS_STACKTRACE && !NO_THREAD
  537. [Fact]
  538. public void OnErrorResumeNext_TailRecursive2()
  539. {
  540. var f = default(Func<int, IObservable<int>>);
  541. f = x => Observable.Defer(() => Observable.Throw<int>(new Exception(), ThreadPoolScheduler.Instance).StartWith(x).OnErrorResumeNext(f(x + 1)));
  542. var lst = new List<int>();
  543. f(0).Select(x => new StackTrace().FrameCount).Take(10).ForEach(lst.Add);
  544. Assert.True(lst.Last() - lst.First() < 10);
  545. }
  546. #endif
  547. [Fact]
  548. public void OnErrorResumeNext_TailRecursive3()
  549. {
  550. var ex = new Exception();
  551. var res =
  552. Observable.OnErrorResumeNext(
  553. Observable.Return(1),
  554. Observable.Defer(() =>
  555. {
  556. if (ex != null)
  557. {
  558. throw ex;
  559. }
  560. return Observable.Return(-2);
  561. }),
  562. Observable.Defer(() =>
  563. {
  564. if (ex != null)
  565. {
  566. throw ex;
  567. }
  568. return Observable.Return(-1);
  569. }),
  570. Observable.Return(2)
  571. )
  572. .SequenceEqual(new[] { 1, 2 });
  573. Assert.True(res.Wait());
  574. }
  575. #endif
  576. private IEnumerable<IObservable<int>> Catch_IteratorThrows_Source(Exception ex, bool b)
  577. {
  578. if (b)
  579. {
  580. throw ex;
  581. }
  582. else
  583. {
  584. yield break;
  585. }
  586. }
  587. }
  588. }