OnErrorResumeNextTest.cs 20 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class OnErrorResumeNextTest : ReactiveTest
  22. {
  23. [Fact]
  24. public void OnErrorResumeNext_ArgumentChecking()
  25. {
  26. var xs = DummyObservable<int>.Instance;
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext<int>((IObservable<int>[])null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext<int>((IEnumerable<IObservable<int>>)null));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext<int>((IObservable<int>)null, xs));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext<int>(xs, (IObservable<int>)null));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext<int>(null, xs));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext<int>(xs, null));
  33. }
  34. [Fact]
  35. public void OnErrorResumeNext_IEofIO_GetEnumeratorThrows()
  36. {
  37. var ex = new Exception();
  38. var scheduler = new TestScheduler();
  39. var xss = new RogueEnumerable<IObservable<int>>(ex);
  40. var res = scheduler.Start(() =>
  41. Observable.OnErrorResumeNext(xss)
  42. );
  43. res.Messages.AssertEqual(
  44. OnError<int>(200, ex)
  45. );
  46. }
  47. [Fact]
  48. public void OnErrorResumeNext_IEofIO()
  49. {
  50. var scheduler = new TestScheduler();
  51. var xs1 = scheduler.CreateColdObservable<int>(
  52. OnNext(10, 1),
  53. OnNext(20, 2),
  54. OnNext(30, 3),
  55. OnCompleted<int>(40)
  56. );
  57. var xs2 = scheduler.CreateColdObservable<int>(
  58. OnNext(10, 4),
  59. OnNext(20, 5),
  60. OnError<int>(30, new Exception())
  61. );
  62. var xs3 = scheduler.CreateColdObservable<int>(
  63. OnNext(10, 6),
  64. OnNext(20, 7),
  65. OnNext(30, 8),
  66. OnNext(40, 9),
  67. OnCompleted<int>(50)
  68. );
  69. var res = scheduler.Start(() =>
  70. Observable.OnErrorResumeNext(new[] { xs1, xs2, xs3 })
  71. );
  72. res.Messages.AssertEqual(
  73. OnNext(210, 1),
  74. OnNext(220, 2),
  75. OnNext(230, 3),
  76. OnNext(250, 4),
  77. OnNext(260, 5),
  78. OnNext(280, 6),
  79. OnNext(290, 7),
  80. OnNext(300, 8),
  81. OnNext(310, 9),
  82. OnCompleted<int>(320)
  83. );
  84. xs1.Subscriptions.AssertEqual(
  85. Subscribe(200, 240)
  86. );
  87. xs2.Subscriptions.AssertEqual(
  88. Subscribe(240, 270)
  89. );
  90. xs3.Subscriptions.AssertEqual(
  91. Subscribe(270, 320)
  92. );
  93. }
  94. [Fact]
  95. public void OnErrorResumeNext_NoErrors()
  96. {
  97. var scheduler = new TestScheduler();
  98. var o1 = scheduler.CreateHotObservable(
  99. OnNext(150, 1),
  100. OnNext(210, 2),
  101. OnNext(220, 3),
  102. OnCompleted<int>(230)
  103. );
  104. var o2 = scheduler.CreateHotObservable(
  105. OnNext(240, 4),
  106. OnCompleted<int>(250)
  107. );
  108. var res = scheduler.Start(() =>
  109. o1.OnErrorResumeNext(o2)
  110. );
  111. res.Messages.AssertEqual(
  112. OnNext(210, 2),
  113. OnNext(220, 3),
  114. OnNext(240, 4),
  115. OnCompleted<int>(250)
  116. );
  117. o1.Subscriptions.AssertEqual(
  118. Subscribe(200, 230)
  119. );
  120. o2.Subscriptions.AssertEqual(
  121. Subscribe(230, 250)
  122. );
  123. }
  124. [Fact]
  125. public void OnErrorResumeNext_Error()
  126. {
  127. var scheduler = new TestScheduler();
  128. var o1 = scheduler.CreateHotObservable(
  129. OnNext(150, 1),
  130. OnNext(210, 2),
  131. OnNext(220, 3),
  132. OnError<int>(230, new Exception())
  133. );
  134. var o2 = scheduler.CreateHotObservable(
  135. OnNext(240, 4),
  136. OnCompleted<int>(250)
  137. );
  138. var res = scheduler.Start(() =>
  139. o1.OnErrorResumeNext(o2)
  140. );
  141. res.Messages.AssertEqual(
  142. OnNext(210, 2),
  143. OnNext(220, 3),
  144. OnNext(240, 4),
  145. OnCompleted<int>(250)
  146. );
  147. o1.Subscriptions.AssertEqual(
  148. Subscribe(200, 230)
  149. );
  150. o2.Subscriptions.AssertEqual(
  151. Subscribe(230, 250)
  152. );
  153. }
  154. [Fact]
  155. public void OnErrorResumeNext_ErrorMultiple()
  156. {
  157. var scheduler = new TestScheduler();
  158. var o1 = scheduler.CreateHotObservable(
  159. OnNext(150, 1),
  160. OnNext(210, 2),
  161. OnError<int>(220, new Exception())
  162. );
  163. var o2 = scheduler.CreateHotObservable(
  164. OnNext(230, 3),
  165. OnError<int>(240, new Exception())
  166. );
  167. var o3 = scheduler.CreateHotObservable(
  168. OnCompleted<int>(250)
  169. );
  170. var res = scheduler.Start(() =>
  171. Observable.OnErrorResumeNext(o1, o2, o3)
  172. );
  173. res.Messages.AssertEqual(
  174. OnNext(210, 2),
  175. OnNext(230, 3),
  176. OnCompleted<int>(250)
  177. );
  178. o1.Subscriptions.AssertEqual(
  179. Subscribe(200, 220)
  180. );
  181. o2.Subscriptions.AssertEqual(
  182. Subscribe(220, 240)
  183. );
  184. o3.Subscriptions.AssertEqual(
  185. Subscribe(240, 250)
  186. );
  187. }
  188. [Fact]
  189. public void OnErrorResumeNext_EmptyReturnThrowAndMore()
  190. {
  191. var scheduler = new TestScheduler();
  192. var o1 = scheduler.CreateHotObservable(
  193. OnNext(150, 1),
  194. OnCompleted<int>(205)
  195. );
  196. var o2 = scheduler.CreateHotObservable(
  197. OnNext(215, 2),
  198. OnCompleted<int>(220)
  199. );
  200. var o3 = scheduler.CreateHotObservable(
  201. OnNext(225, 3),
  202. OnNext(230, 4),
  203. OnCompleted<int>(235)
  204. );
  205. var o4 = scheduler.CreateHotObservable(
  206. OnError<int>(240, new Exception())
  207. );
  208. var o5 = scheduler.CreateHotObservable(
  209. OnNext<int>(245, 5),
  210. OnCompleted<int>(250)
  211. );
  212. var res = scheduler.Start(() =>
  213. new[] { o1, o2, o3, o4, o5 }.OnErrorResumeNext()
  214. );
  215. res.Messages.AssertEqual(
  216. OnNext(215, 2),
  217. OnNext(225, 3),
  218. OnNext(230, 4),
  219. OnNext(245, 5),
  220. OnCompleted<int>(250)
  221. );
  222. o1.Subscriptions.AssertEqual(
  223. Subscribe(200, 205)
  224. );
  225. o2.Subscriptions.AssertEqual(
  226. Subscribe(205, 220)
  227. );
  228. o3.Subscriptions.AssertEqual(
  229. Subscribe(220, 235)
  230. );
  231. o4.Subscriptions.AssertEqual(
  232. Subscribe(235, 240)
  233. );
  234. o5.Subscriptions.AssertEqual(
  235. Subscribe(240, 250)
  236. );
  237. }
  238. [Fact]
  239. public void OnErrorResumeNext_LastIsntSpecial()
  240. {
  241. var scheduler = new TestScheduler();
  242. var ex = new Exception();
  243. var o1 = scheduler.CreateHotObservable(
  244. OnNext(150, 1),
  245. OnNext(210, 2),
  246. OnCompleted<int>(220)
  247. );
  248. var o2 = scheduler.CreateHotObservable(
  249. OnError<int>(230, ex)
  250. );
  251. var res = scheduler.Start(() =>
  252. o1.OnErrorResumeNext(o2)
  253. );
  254. res.Messages.AssertEqual(
  255. OnNext(210, 2),
  256. OnCompleted<int>(230)
  257. );
  258. o1.Subscriptions.AssertEqual(
  259. Subscribe(200, 220)
  260. );
  261. o2.Subscriptions.AssertEqual(
  262. Subscribe(220, 230)
  263. );
  264. }
  265. [Fact]
  266. public void OnErrorResumeNext_SingleSourceDoesntThrow()
  267. {
  268. var scheduler = new TestScheduler();
  269. var ex = new Exception();
  270. var o1 = scheduler.CreateHotObservable(
  271. OnError<int>(230, ex)
  272. );
  273. var res = scheduler.Start(() =>
  274. Observable.OnErrorResumeNext(o1)
  275. );
  276. res.Messages.AssertEqual(
  277. OnCompleted<int>(230)
  278. );
  279. o1.Subscriptions.AssertEqual(
  280. Subscribe(200, 230)
  281. );
  282. }
  283. [Fact]
  284. public void OnErrorResumeNext_EndWithNever()
  285. {
  286. var scheduler = new TestScheduler();
  287. var ex = new Exception();
  288. var o1 = scheduler.CreateHotObservable(
  289. OnNext(150, 1),
  290. OnNext(210, 2),
  291. OnCompleted<int>(220)
  292. );
  293. var o2 = scheduler.CreateHotObservable(
  294. OnNext(150, 1)
  295. );
  296. var res = scheduler.Start(() =>
  297. Observable.OnErrorResumeNext(o1, o2)
  298. );
  299. res.Messages.AssertEqual(
  300. OnNext(210, 2)
  301. );
  302. o1.Subscriptions.AssertEqual(
  303. Subscribe(200, 220)
  304. );
  305. o2.Subscriptions.AssertEqual(
  306. Subscribe(220, 1000)
  307. );
  308. }
  309. [Fact]
  310. public void OnErrorResumeNext_StartWithNever()
  311. {
  312. var scheduler = new TestScheduler();
  313. var ex = new Exception();
  314. var o1 = scheduler.CreateHotObservable(
  315. OnNext(150, 1)
  316. );
  317. var o2 = scheduler.CreateHotObservable(
  318. OnNext(150, 1),
  319. OnNext(210, 2),
  320. OnCompleted<int>(220)
  321. );
  322. var res = scheduler.Start(() =>
  323. Observable.OnErrorResumeNext(o1, o2)
  324. );
  325. res.Messages.AssertEqual(
  326. );
  327. o1.Subscriptions.AssertEqual(
  328. Subscribe(200, 1000)
  329. );
  330. o2.Subscriptions.AssertEqual(
  331. );
  332. }
  333. [Fact]
  334. public void OnErrorResumeNext_DefaultScheduler_Binary()
  335. {
  336. var evt = new ManualResetEvent(false);
  337. int sum = 0;
  338. Observable.Return(1).OnErrorResumeNext(Observable.Return(2)).Subscribe(x =>
  339. {
  340. sum += x;
  341. }, () => evt.Set());
  342. evt.WaitOne();
  343. Assert.Equal(3, sum);
  344. }
  345. [Fact]
  346. public void OnErrorResumeNext_DefaultScheduler_Nary()
  347. {
  348. var evt = new ManualResetEvent(false);
  349. int sum = 0;
  350. Observable.OnErrorResumeNext(Observable.Return(1), Observable.Return(2), Observable.Return(3)).Subscribe(x =>
  351. {
  352. sum += x;
  353. }, () => evt.Set());
  354. evt.WaitOne();
  355. Assert.Equal(6, sum);
  356. }
  357. [Fact]
  358. public void OnErrorResumeNext_DefaultScheduler_NaryEnumerable()
  359. {
  360. var evt = new ManualResetEvent(false);
  361. IEnumerable<IObservable<int>> sources = new[] { Observable.Return(1), Observable.Return(2), Observable.Return(3) };
  362. int sum = 0;
  363. Observable.OnErrorResumeNext(sources).Subscribe(x =>
  364. {
  365. sum += x;
  366. }, () => evt.Set());
  367. evt.WaitOne();
  368. Assert.Equal(6, sum);
  369. }
  370. [Fact]
  371. public void OnErrorResumeNext_IteratorThrows()
  372. {
  373. var scheduler = new TestScheduler();
  374. var ex = new Exception();
  375. var res = scheduler.Start(() =>
  376. Observable.OnErrorResumeNext<int>(Catch_IteratorThrows_Source(ex, true))
  377. );
  378. res.Messages.AssertEqual(
  379. OnError<int>(200, ex)
  380. );
  381. }
  382. [Fact]
  383. public void OnErrorResumeNext_EnumerableThrows()
  384. {
  385. var scheduler = new TestScheduler();
  386. var o = scheduler.CreateHotObservable(
  387. OnNext(150, 1),
  388. OnNext(210, 2),
  389. OnNext(220, 3),
  390. OnError<int>(225, new Exception())
  391. );
  392. var ex = new Exception();
  393. var xss = new MockEnumerable<IObservable<int>>(scheduler, GetObservablesForOnErrorResumeNextThrow(o, ex));
  394. var res = scheduler.Start(() =>
  395. xss.OnErrorResumeNext()
  396. );
  397. res.Messages.AssertEqual(
  398. OnNext(210, 2),
  399. OnNext(220, 3),
  400. OnError<int>(225, ex)
  401. );
  402. o.Subscriptions.AssertEqual(
  403. Subscribe(200, 225)
  404. );
  405. xss.Subscriptions.AssertEqual(
  406. Subscribe(200, 225)
  407. );
  408. }
  409. private IEnumerable<IObservable<int>> GetObservablesForOnErrorResumeNextThrow(IObservable<int> first, Exception ex)
  410. {
  411. yield return first;
  412. throw ex;
  413. }
  414. [Fact]
  415. public void OnErrorResumeNext_EnumerableTiming()
  416. {
  417. var scheduler = new TestScheduler();
  418. var o1 = scheduler.CreateHotObservable(
  419. OnNext(150, 1),
  420. OnNext(210, 2), // !
  421. OnNext(220, 3), // !
  422. OnCompleted<int>(230)
  423. );
  424. var o2 = scheduler.CreateColdObservable(
  425. OnNext(50, 4), // !
  426. OnNext(60, 5), // !
  427. OnNext(70, 6), // !
  428. OnError<int>(80, new Exception())
  429. );
  430. var o3 = scheduler.CreateHotObservable(
  431. OnNext(150, 1),
  432. OnNext(200, 2),
  433. OnNext(210, 3),
  434. OnNext(220, 4),
  435. OnNext(230, 5),
  436. OnNext(270, 6),
  437. OnNext(320, 7), // !
  438. OnNext(330, 8), // !
  439. OnCompleted<int>(340)
  440. );
  441. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, new[] { o1, o2, o3, o2 });
  442. var res = scheduler.Start(() =>
  443. xss.Select(xs => (IObservable<int>)xs).OnErrorResumeNext()
  444. );
  445. res.Messages.AssertEqual(
  446. OnNext(210, 2),
  447. OnNext(220, 3),
  448. OnNext(280, 4),
  449. OnNext(290, 5),
  450. OnNext(300, 6),
  451. OnNext(320, 7),
  452. OnNext(330, 8),
  453. OnNext(390, 4),
  454. OnNext(400, 5),
  455. OnNext(410, 6),
  456. OnCompleted<int>(420)
  457. );
  458. o1.Subscriptions.AssertEqual(
  459. Subscribe(200, 230)
  460. );
  461. o2.Subscriptions.AssertEqual(
  462. Subscribe(230, 310),
  463. Subscribe(340, 420)
  464. );
  465. o3.Subscriptions.AssertEqual(
  466. Subscribe(310, 340)
  467. );
  468. xss.Subscriptions.AssertEqual(
  469. Subscribe(200, 420)
  470. );
  471. }
  472. [Fact]
  473. public void OnErrorResumeNext_Enumerable_Dispose()
  474. {
  475. var scheduler = new TestScheduler();
  476. var o1 = scheduler.CreateHotObservable(
  477. OnNext(150, 1),
  478. OnNext(210, 2),
  479. OnNext(220, 3),
  480. OnError<int>(230, new Exception())
  481. );
  482. var o2 = scheduler.CreateHotObservable(
  483. OnNext(150, 1),
  484. OnNext(200, 2),
  485. OnNext(210, 3),
  486. OnNext(240, 4),
  487. OnNext(270, 5),
  488. OnNext(320, 6),
  489. OnNext(330, 7),
  490. OnCompleted<int>(340)
  491. );
  492. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, new[] { o1, o2 });
  493. var res = scheduler.Start(() =>
  494. xss.Select(xs => (IObservable<int>)xs).OnErrorResumeNext(),
  495. 300
  496. );
  497. res.Messages.AssertEqual(
  498. OnNext(210, 2),
  499. OnNext(220, 3),
  500. OnNext(240, 4),
  501. OnNext(270, 5)
  502. );
  503. o1.Subscriptions.AssertEqual(
  504. Subscribe(200, 230)
  505. );
  506. o2.Subscriptions.AssertEqual(
  507. Subscribe(230, 300)
  508. );
  509. xss.Subscriptions.AssertEqual(
  510. Subscribe(200, 300)
  511. );
  512. }
  513. #if !NO_PERF
  514. [Fact]
  515. public void OnErrorResumeNext_TailRecursive1()
  516. {
  517. var create = 0L;
  518. var start = 200L;
  519. var end = 1000L;
  520. var scheduler = new TestScheduler();
  521. var o = scheduler.CreateColdObservable<int>(
  522. OnNext(10, 1),
  523. OnNext(20, 2),
  524. OnNext(30, 3),
  525. OnError<int>(40, new Exception())
  526. );
  527. var f = default(Func<IObservable<int>>);
  528. f = () => Observable.Defer(() => o.OnErrorResumeNext(f()));
  529. var res = scheduler.Start(() => f(), create, start, end);
  530. var expected = new List<Recorded<Notification<int>>>();
  531. var t = start;
  532. while (t <= end)
  533. {
  534. var n = (t - start) / 10;
  535. if (n % 4 != 0)
  536. {
  537. expected.Add(OnNext(t, (int)(n % 4)));
  538. }
  539. t += 10;
  540. }
  541. res.Messages.AssertEqual(expected);
  542. }
  543. #if HAS_STACKTRACE && !NO_THREAD
  544. [Fact]
  545. public void OnErrorResumeNext_TailRecursive2()
  546. {
  547. var f = default(Func<int, IObservable<int>>);
  548. f = x => Observable.Defer(() => Observable.Throw<int>(new Exception(), ThreadPoolScheduler.Instance).StartWith(x).OnErrorResumeNext(f(x + 1)));
  549. var lst = new List<int>();
  550. f(0).Select(x => new StackTrace().FrameCount).Take(10).ForEach(lst.Add);
  551. Assert.True(lst.Last() - lst.First() < 10);
  552. }
  553. #endif
  554. [Fact]
  555. public void OnErrorResumeNext_TailRecursive3()
  556. {
  557. var ex = new Exception();
  558. var res =
  559. Observable.OnErrorResumeNext(
  560. Observable.Return(1),
  561. Observable.Defer(() =>
  562. {
  563. if (ex != null)
  564. {
  565. throw ex;
  566. }
  567. return Observable.Return(-2);
  568. }),
  569. Observable.Defer(() =>
  570. {
  571. if (ex != null)
  572. {
  573. throw ex;
  574. }
  575. return Observable.Return(-1);
  576. }),
  577. Observable.Return(2)
  578. )
  579. .SequenceEqual(new[] { 1, 2 });
  580. Assert.True(res.Wait());
  581. }
  582. #endif
  583. private IEnumerable<IObservable<int>> Catch_IteratorThrows_Source(Exception ex, bool b)
  584. {
  585. if (b)
  586. throw ex;
  587. else
  588. yield break;
  589. }
  590. }
  591. }