OnErrorResumeNextTest.cs 20 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Linq;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using ReactiveTests.Dummies;
  12. using Microsoft.VisualStudio.TestTools.UnitTesting;
  13. using Assert = Xunit.Assert;
  14. namespace ReactiveTests.Tests
  15. {
  16. [TestClass]
  17. public class OnErrorResumeNextTest : ReactiveTest
  18. {
  19. [TestMethod]
  20. public void OnErrorResumeNext_ArgumentChecking()
  21. {
  22. var xs = DummyObservable<int>.Instance;
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext<int>(null));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext((IEnumerable<IObservable<int>>)null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext(null, xs));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext(xs, null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext(null, xs));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.OnErrorResumeNext(xs, null));
  29. }
  30. [TestMethod]
  31. public void OnErrorResumeNext_IEofIO_GetEnumeratorThrows()
  32. {
  33. var ex = new Exception();
  34. var scheduler = new TestScheduler();
  35. var xss = new RogueEnumerable<IObservable<int>>(ex);
  36. var res = scheduler.Start(() =>
  37. Observable.OnErrorResumeNext(xss)
  38. );
  39. res.Messages.AssertEqual(
  40. OnError<int>(200, ex)
  41. );
  42. }
  43. [TestMethod]
  44. public void OnErrorResumeNext_IEofIO()
  45. {
  46. var scheduler = new TestScheduler();
  47. var xs1 = scheduler.CreateColdObservable(
  48. OnNext(10, 1),
  49. OnNext(20, 2),
  50. OnNext(30, 3),
  51. OnCompleted<int>(40)
  52. );
  53. var xs2 = scheduler.CreateColdObservable(
  54. OnNext(10, 4),
  55. OnNext(20, 5),
  56. OnError<int>(30, new Exception())
  57. );
  58. var xs3 = scheduler.CreateColdObservable(
  59. OnNext(10, 6),
  60. OnNext(20, 7),
  61. OnNext(30, 8),
  62. OnNext(40, 9),
  63. OnCompleted<int>(50)
  64. );
  65. var res = scheduler.Start(() =>
  66. Observable.OnErrorResumeNext([xs1, xs2, xs3])
  67. );
  68. res.Messages.AssertEqual(
  69. OnNext(210, 1),
  70. OnNext(220, 2),
  71. OnNext(230, 3),
  72. OnNext(250, 4),
  73. OnNext(260, 5),
  74. OnNext(280, 6),
  75. OnNext(290, 7),
  76. OnNext(300, 8),
  77. OnNext(310, 9),
  78. OnCompleted<int>(320)
  79. );
  80. xs1.Subscriptions.AssertEqual(
  81. Subscribe(200, 240)
  82. );
  83. xs2.Subscriptions.AssertEqual(
  84. Subscribe(240, 270)
  85. );
  86. xs3.Subscriptions.AssertEqual(
  87. Subscribe(270, 320)
  88. );
  89. }
  90. [TestMethod]
  91. public void OnErrorResumeNext_NoErrors()
  92. {
  93. var scheduler = new TestScheduler();
  94. var o1 = scheduler.CreateHotObservable(
  95. OnNext(150, 1),
  96. OnNext(210, 2),
  97. OnNext(220, 3),
  98. OnCompleted<int>(230)
  99. );
  100. var o2 = scheduler.CreateHotObservable(
  101. OnNext(240, 4),
  102. OnCompleted<int>(250)
  103. );
  104. var res = scheduler.Start(() =>
  105. o1.OnErrorResumeNext(o2)
  106. );
  107. res.Messages.AssertEqual(
  108. OnNext(210, 2),
  109. OnNext(220, 3),
  110. OnNext(240, 4),
  111. OnCompleted<int>(250)
  112. );
  113. o1.Subscriptions.AssertEqual(
  114. Subscribe(200, 230)
  115. );
  116. o2.Subscriptions.AssertEqual(
  117. Subscribe(230, 250)
  118. );
  119. }
  120. [TestMethod]
  121. public void OnErrorResumeNext_Error()
  122. {
  123. var scheduler = new TestScheduler();
  124. var o1 = scheduler.CreateHotObservable(
  125. OnNext(150, 1),
  126. OnNext(210, 2),
  127. OnNext(220, 3),
  128. OnError<int>(230, new Exception())
  129. );
  130. var o2 = scheduler.CreateHotObservable(
  131. OnNext(240, 4),
  132. OnCompleted<int>(250)
  133. );
  134. var res = scheduler.Start(() =>
  135. o1.OnErrorResumeNext(o2)
  136. );
  137. res.Messages.AssertEqual(
  138. OnNext(210, 2),
  139. OnNext(220, 3),
  140. OnNext(240, 4),
  141. OnCompleted<int>(250)
  142. );
  143. o1.Subscriptions.AssertEqual(
  144. Subscribe(200, 230)
  145. );
  146. o2.Subscriptions.AssertEqual(
  147. Subscribe(230, 250)
  148. );
  149. }
  150. [TestMethod]
  151. public void OnErrorResumeNext_ErrorMultiple()
  152. {
  153. var scheduler = new TestScheduler();
  154. var o1 = scheduler.CreateHotObservable(
  155. OnNext(150, 1),
  156. OnNext(210, 2),
  157. OnError<int>(220, new Exception())
  158. );
  159. var o2 = scheduler.CreateHotObservable(
  160. OnNext(230, 3),
  161. OnError<int>(240, new Exception())
  162. );
  163. var o3 = scheduler.CreateHotObservable(
  164. OnCompleted<int>(250)
  165. );
  166. var res = scheduler.Start(() =>
  167. Observable.OnErrorResumeNext(o1, o2, o3)
  168. );
  169. res.Messages.AssertEqual(
  170. OnNext(210, 2),
  171. OnNext(230, 3),
  172. OnCompleted<int>(250)
  173. );
  174. o1.Subscriptions.AssertEqual(
  175. Subscribe(200, 220)
  176. );
  177. o2.Subscriptions.AssertEqual(
  178. Subscribe(220, 240)
  179. );
  180. o3.Subscriptions.AssertEqual(
  181. Subscribe(240, 250)
  182. );
  183. }
  184. [TestMethod]
  185. public void OnErrorResumeNext_EmptyReturnThrowAndMore()
  186. {
  187. var scheduler = new TestScheduler();
  188. var o1 = scheduler.CreateHotObservable(
  189. OnNext(150, 1),
  190. OnCompleted<int>(205)
  191. );
  192. var o2 = scheduler.CreateHotObservable(
  193. OnNext(215, 2),
  194. OnCompleted<int>(220)
  195. );
  196. var o3 = scheduler.CreateHotObservable(
  197. OnNext(225, 3),
  198. OnNext(230, 4),
  199. OnCompleted<int>(235)
  200. );
  201. var o4 = scheduler.CreateHotObservable(
  202. OnError<int>(240, new Exception())
  203. );
  204. var o5 = scheduler.CreateHotObservable(
  205. OnNext(245, 5),
  206. OnCompleted<int>(250)
  207. );
  208. var res = scheduler.Start(() =>
  209. new[] { o1, o2, o3, o4, o5 }.OnErrorResumeNext()
  210. );
  211. res.Messages.AssertEqual(
  212. OnNext(215, 2),
  213. OnNext(225, 3),
  214. OnNext(230, 4),
  215. OnNext(245, 5),
  216. OnCompleted<int>(250)
  217. );
  218. o1.Subscriptions.AssertEqual(
  219. Subscribe(200, 205)
  220. );
  221. o2.Subscriptions.AssertEqual(
  222. Subscribe(205, 220)
  223. );
  224. o3.Subscriptions.AssertEqual(
  225. Subscribe(220, 235)
  226. );
  227. o4.Subscriptions.AssertEqual(
  228. Subscribe(235, 240)
  229. );
  230. o5.Subscriptions.AssertEqual(
  231. Subscribe(240, 250)
  232. );
  233. }
  234. [TestMethod]
  235. public void OnErrorResumeNext_LastIsntSpecial()
  236. {
  237. var scheduler = new TestScheduler();
  238. var ex = new Exception();
  239. var o1 = scheduler.CreateHotObservable(
  240. OnNext(150, 1),
  241. OnNext(210, 2),
  242. OnCompleted<int>(220)
  243. );
  244. var o2 = scheduler.CreateHotObservable(
  245. OnError<int>(230, ex)
  246. );
  247. var res = scheduler.Start(() =>
  248. o1.OnErrorResumeNext(o2)
  249. );
  250. res.Messages.AssertEqual(
  251. OnNext(210, 2),
  252. OnCompleted<int>(230)
  253. );
  254. o1.Subscriptions.AssertEqual(
  255. Subscribe(200, 220)
  256. );
  257. o2.Subscriptions.AssertEqual(
  258. Subscribe(220, 230)
  259. );
  260. }
  261. [TestMethod]
  262. public void OnErrorResumeNext_SingleSourceDoesntThrow()
  263. {
  264. var scheduler = new TestScheduler();
  265. var ex = new Exception();
  266. var o1 = scheduler.CreateHotObservable(
  267. OnError<int>(230, ex)
  268. );
  269. var res = scheduler.Start(() =>
  270. Observable.OnErrorResumeNext(o1)
  271. );
  272. res.Messages.AssertEqual(
  273. OnCompleted<int>(230)
  274. );
  275. o1.Subscriptions.AssertEqual(
  276. Subscribe(200, 230)
  277. );
  278. }
  279. [TestMethod]
  280. public void OnErrorResumeNext_EndWithNever()
  281. {
  282. var scheduler = new TestScheduler();
  283. var ex = new Exception();
  284. var o1 = scheduler.CreateHotObservable(
  285. OnNext(150, 1),
  286. OnNext(210, 2),
  287. OnCompleted<int>(220)
  288. );
  289. var o2 = scheduler.CreateHotObservable(
  290. OnNext(150, 1)
  291. );
  292. var res = scheduler.Start(() =>
  293. Observable.OnErrorResumeNext(o1, o2)
  294. );
  295. res.Messages.AssertEqual(
  296. OnNext(210, 2)
  297. );
  298. o1.Subscriptions.AssertEqual(
  299. Subscribe(200, 220)
  300. );
  301. o2.Subscriptions.AssertEqual(
  302. Subscribe(220, 1000)
  303. );
  304. }
  305. [TestMethod]
  306. public void OnErrorResumeNext_StartWithNever()
  307. {
  308. var scheduler = new TestScheduler();
  309. var ex = new Exception();
  310. var o1 = scheduler.CreateHotObservable(
  311. OnNext(150, 1)
  312. );
  313. var o2 = scheduler.CreateHotObservable(
  314. OnNext(150, 1),
  315. OnNext(210, 2),
  316. OnCompleted<int>(220)
  317. );
  318. var res = scheduler.Start(() =>
  319. Observable.OnErrorResumeNext(o1, o2)
  320. );
  321. res.Messages.AssertEqual(
  322. );
  323. o1.Subscriptions.AssertEqual(
  324. Subscribe(200, 1000)
  325. );
  326. o2.Subscriptions.AssertEqual(
  327. );
  328. }
  329. [TestMethod]
  330. public void OnErrorResumeNext_DefaultScheduler_Binary()
  331. {
  332. var evt = new ManualResetEvent(false);
  333. var sum = 0;
  334. Observable.Return(1).OnErrorResumeNext(Observable.Return(2)).Subscribe(x =>
  335. {
  336. sum += x;
  337. }, () => evt.Set());
  338. evt.WaitOne();
  339. Assert.Equal(3, sum);
  340. }
  341. [TestMethod]
  342. public void OnErrorResumeNext_DefaultScheduler_Nary()
  343. {
  344. var evt = new ManualResetEvent(false);
  345. var sum = 0;
  346. Observable.OnErrorResumeNext(Observable.Return(1), Observable.Return(2), Observable.Return(3)).Subscribe(x =>
  347. {
  348. sum += x;
  349. }, () => evt.Set());
  350. evt.WaitOne();
  351. Assert.Equal(6, sum);
  352. }
  353. [TestMethod]
  354. public void OnErrorResumeNext_DefaultScheduler_NaryEnumerable()
  355. {
  356. var evt = new ManualResetEvent(false);
  357. IEnumerable<IObservable<int>> sources = [Observable.Return(1), Observable.Return(2), Observable.Return(3)];
  358. var sum = 0;
  359. Observable.OnErrorResumeNext(sources).Subscribe(x =>
  360. {
  361. sum += x;
  362. }, () => evt.Set());
  363. evt.WaitOne();
  364. Assert.Equal(6, sum);
  365. }
  366. [TestMethod]
  367. public void OnErrorResumeNext_IteratorThrows()
  368. {
  369. var scheduler = new TestScheduler();
  370. var ex = new Exception();
  371. var res = scheduler.Start(() =>
  372. Observable.OnErrorResumeNext(Catch_IteratorThrows_Source(ex, true))
  373. );
  374. res.Messages.AssertEqual(
  375. OnError<int>(200, ex)
  376. );
  377. }
  378. [TestMethod]
  379. public void OnErrorResumeNext_EnumerableThrows()
  380. {
  381. var scheduler = new TestScheduler();
  382. var o = scheduler.CreateHotObservable(
  383. OnNext(150, 1),
  384. OnNext(210, 2),
  385. OnNext(220, 3),
  386. OnError<int>(225, new Exception())
  387. );
  388. var ex = new Exception();
  389. var xss = new MockEnumerable<IObservable<int>>(scheduler, GetObservablesForOnErrorResumeNextThrow(o, ex));
  390. var res = scheduler.Start(() =>
  391. xss.OnErrorResumeNext()
  392. );
  393. res.Messages.AssertEqual(
  394. OnNext(210, 2),
  395. OnNext(220, 3),
  396. OnError<int>(225, ex)
  397. );
  398. o.Subscriptions.AssertEqual(
  399. Subscribe(200, 225)
  400. );
  401. xss.Subscriptions.AssertEqual(
  402. Subscribe(200, 225)
  403. );
  404. }
  405. private IEnumerable<IObservable<int>> GetObservablesForOnErrorResumeNextThrow(IObservable<int> first, Exception ex)
  406. {
  407. yield return first;
  408. throw ex;
  409. }
  410. [TestMethod]
  411. public void OnErrorResumeNext_EnumerableTiming()
  412. {
  413. var scheduler = new TestScheduler();
  414. var o1 = scheduler.CreateHotObservable(
  415. OnNext(150, 1),
  416. OnNext(210, 2), // !
  417. OnNext(220, 3), // !
  418. OnCompleted<int>(230)
  419. );
  420. var o2 = scheduler.CreateColdObservable(
  421. OnNext(50, 4), // !
  422. OnNext(60, 5), // !
  423. OnNext(70, 6), // !
  424. OnError<int>(80, new Exception())
  425. );
  426. var o3 = scheduler.CreateHotObservable(
  427. OnNext(150, 1),
  428. OnNext(200, 2),
  429. OnNext(210, 3),
  430. OnNext(220, 4),
  431. OnNext(230, 5),
  432. OnNext(270, 6),
  433. OnNext(320, 7), // !
  434. OnNext(330, 8), // !
  435. OnCompleted<int>(340)
  436. );
  437. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, [o1, o2, o3, o2]);
  438. var res = scheduler.Start(() =>
  439. xss.Select(xs => (IObservable<int>)xs).OnErrorResumeNext()
  440. );
  441. res.Messages.AssertEqual(
  442. OnNext(210, 2),
  443. OnNext(220, 3),
  444. OnNext(280, 4),
  445. OnNext(290, 5),
  446. OnNext(300, 6),
  447. OnNext(320, 7),
  448. OnNext(330, 8),
  449. OnNext(390, 4),
  450. OnNext(400, 5),
  451. OnNext(410, 6),
  452. OnCompleted<int>(420)
  453. );
  454. o1.Subscriptions.AssertEqual(
  455. Subscribe(200, 230)
  456. );
  457. o2.Subscriptions.AssertEqual(
  458. Subscribe(230, 310),
  459. Subscribe(340, 420)
  460. );
  461. o3.Subscriptions.AssertEqual(
  462. Subscribe(310, 340)
  463. );
  464. xss.Subscriptions.AssertEqual(
  465. Subscribe(200, 420)
  466. );
  467. }
  468. [TestMethod]
  469. public void OnErrorResumeNext_Enumerable_Dispose()
  470. {
  471. var scheduler = new TestScheduler();
  472. var o1 = scheduler.CreateHotObservable(
  473. OnNext(150, 1),
  474. OnNext(210, 2),
  475. OnNext(220, 3),
  476. OnError<int>(230, new Exception())
  477. );
  478. var o2 = scheduler.CreateHotObservable(
  479. OnNext(150, 1),
  480. OnNext(200, 2),
  481. OnNext(210, 3),
  482. OnNext(240, 4),
  483. OnNext(270, 5),
  484. OnNext(320, 6),
  485. OnNext(330, 7),
  486. OnCompleted<int>(340)
  487. );
  488. var xss = new MockEnumerable<ITestableObservable<int>>(scheduler, [o1, o2]);
  489. var res = scheduler.Start(() =>
  490. xss.Select(xs => (IObservable<int>)xs).OnErrorResumeNext(),
  491. 300
  492. );
  493. res.Messages.AssertEqual(
  494. OnNext(210, 2),
  495. OnNext(220, 3),
  496. OnNext(240, 4),
  497. OnNext(270, 5)
  498. );
  499. o1.Subscriptions.AssertEqual(
  500. Subscribe(200, 230)
  501. );
  502. o2.Subscriptions.AssertEqual(
  503. Subscribe(230, 300)
  504. );
  505. xss.Subscriptions.AssertEqual(
  506. Subscribe(200, 300)
  507. );
  508. }
  509. #if !NO_PERF
  510. [TestMethod]
  511. public void OnErrorResumeNext_TailRecursive1()
  512. {
  513. var create = 0L;
  514. var start = 200L;
  515. var end = 1000L;
  516. var scheduler = new TestScheduler();
  517. var o = scheduler.CreateColdObservable(
  518. OnNext(10, 1),
  519. OnNext(20, 2),
  520. OnNext(30, 3),
  521. OnError<int>(40, new Exception())
  522. );
  523. IObservable<int> f() => Observable.Defer(() => o.OnErrorResumeNext(f()));
  524. var res = scheduler.Start(() => f(), create, start, end);
  525. var expected = new List<Recorded<Notification<int>>>();
  526. var t = start;
  527. while (t <= end)
  528. {
  529. var n = (t - start) / 10;
  530. if (n % 4 != 0)
  531. {
  532. expected.Add(OnNext(t, (int)(n % 4)));
  533. }
  534. t += 10;
  535. }
  536. res.Messages.AssertEqual(expected);
  537. }
  538. #if HAS_STACKTRACE
  539. [TestMethod]
  540. public void OnErrorResumeNext_TailRecursive2()
  541. {
  542. var f = default(Func<int, IObservable<int>>);
  543. f = x => Observable.Defer(() => Observable.Throw<int>(new Exception(), ThreadPoolScheduler.Instance).StartWith(x).OnErrorResumeNext(f(x + 1)));
  544. var lst = new List<int>();
  545. f(0).Select(x => new StackTrace().FrameCount).Take(10).ForEach(lst.Add);
  546. Assert.True(lst.Last() - lst.First() < 10);
  547. }
  548. #endif
  549. [TestMethod]
  550. public void OnErrorResumeNext_TailRecursive3()
  551. {
  552. var ex = new Exception();
  553. var res =
  554. Observable.OnErrorResumeNext(
  555. Observable.Return(1),
  556. Observable.Defer(() =>
  557. {
  558. if (ex != null)
  559. {
  560. throw ex;
  561. }
  562. return Observable.Return(-2);
  563. }),
  564. Observable.Defer(() =>
  565. {
  566. if (ex != null)
  567. {
  568. throw ex;
  569. }
  570. return Observable.Return(-1);
  571. }),
  572. Observable.Return(2)
  573. )
  574. .SequenceEqual([1, 2]);
  575. Assert.True(res.Wait());
  576. }
  577. #endif
  578. private IEnumerable<IObservable<int>> Catch_IteratorThrows_Source(Exception ex, bool b)
  579. {
  580. if (b)
  581. {
  582. throw ex;
  583. }
  584. else
  585. {
  586. yield break;
  587. }
  588. }
  589. }
  590. }