TimerTest.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Disposables;
  9. using System.Reactive.Linq;
  10. using System.Text;
  11. using System.Threading;
  12. using Microsoft.Reactive.Testing;
  13. using ReactiveTests.Dummies;
  14. using Microsoft.VisualStudio.TestTools.UnitTesting;
  15. using Assert = Xunit.Assert;
  16. namespace ReactiveTests.Tests
  17. {
  18. [TestClass]
  19. public class TimerTest : ReactiveTest
  20. {
  21. [TestMethod]
  22. public void OneShotTimer_TimeSpan_ArgumentChecking()
  23. {
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(TimeSpan.Zero, null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(TimeSpan.Zero, DummyScheduler.Instance).Subscribe(null));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(DateTimeOffset.Now, null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(TimeSpan.Zero, TimeSpan.Zero, null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.Zero, null));
  29. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(-1)));
  30. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(-1), DummyScheduler.Instance));
  31. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(-1)));
  32. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(-1), DummyScheduler.Instance));
  33. }
  34. [TestMethod]
  35. public void OneShotTimer_TimeSpan_Basic()
  36. {
  37. var scheduler = new TestScheduler();
  38. var res = scheduler.Start(() =>
  39. Observable.Timer(TimeSpan.FromTicks(300), scheduler)
  40. );
  41. res.Messages.AssertEqual(
  42. OnNext(500, 0L),
  43. OnCompleted<long>(500)
  44. );
  45. }
  46. [TestMethod]
  47. public void OneShotTimer_TimeSpan_Zero()
  48. {
  49. var scheduler = new TestScheduler();
  50. var res = scheduler.Start(() =>
  51. Observable.Timer(TimeSpan.FromTicks(0), scheduler)
  52. );
  53. res.Messages.AssertEqual(
  54. OnNext(201, 0L),
  55. OnCompleted<long>(201)
  56. );
  57. }
  58. [TestMethod]
  59. public void OneShotTimer_TimeSpan_Zero_DefaultScheduler()
  60. {
  61. var scheduler = new TestScheduler();
  62. var observer = scheduler.CreateObserver<long>();
  63. var completed = new ManualResetEvent(false);
  64. Observable.Timer(TimeSpan.Zero).Subscribe(observer.OnNext, () => completed.Set());
  65. completed.WaitOne();
  66. Assert.Equal(1, observer.Messages.Count);
  67. }
  68. [TestMethod]
  69. public void OneShotTimer_TimeSpan_Negative()
  70. {
  71. var scheduler = new TestScheduler();
  72. var res = scheduler.Start(() =>
  73. Observable.Timer(TimeSpan.FromTicks(-1), scheduler)
  74. );
  75. res.Messages.AssertEqual(
  76. OnNext(201, 0L),
  77. OnCompleted<long>(201)
  78. );
  79. }
  80. [TestMethod]
  81. public void OneShotTimer_TimeSpan_Disposed()
  82. {
  83. var scheduler = new TestScheduler();
  84. var res = scheduler.Start(() =>
  85. Observable.Timer(TimeSpan.FromTicks(1000), scheduler)
  86. );
  87. res.Messages.AssertEqual(
  88. );
  89. }
  90. [TestMethod]
  91. public void OneShotTimer_TimeSpan_ObserverThrows()
  92. {
  93. var scheduler1 = new TestScheduler();
  94. var xs = Observable.Timer(TimeSpan.FromTicks(1), scheduler1);
  95. xs.Subscribe(x => { throw new InvalidOperationException(); });
  96. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  97. var scheduler2 = new TestScheduler();
  98. var ys = Observable.Timer(TimeSpan.FromTicks(1), scheduler2);
  99. ys.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  100. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  101. }
  102. [TestMethod]
  103. public void OneShotTimer_TimeSpan_DefaultScheduler()
  104. {
  105. Assert.True(Observable.Timer(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual([0L]));
  106. }
  107. [TestMethod]
  108. public void OneShotTimer_DateTimeOffset_DefaultScheduler()
  109. {
  110. Assert.True(Observable.Timer(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(1)).ToEnumerable().SequenceEqual([0L]));
  111. }
  112. [TestMethod]
  113. public void OneShotTimer_TimeSpan_TimeSpan_DefaultScheduler()
  114. {
  115. Assert.True(Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(2).SequenceEqual([0L, 1L]));
  116. }
  117. [TestMethod]
  118. public void OneShotTimer_DateTimeOffset_TimeSpan_DefaultScheduler()
  119. {
  120. Assert.True(Observable.Timer(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(2).SequenceEqual([0L, 1L]));
  121. }
  122. [TestMethod]
  123. public void OneShotTimer_DateTimeOffset_Basic()
  124. {
  125. var scheduler = new TestScheduler();
  126. var res = scheduler.Start(() =>
  127. Observable.Timer(new DateTimeOffset(500, TimeSpan.Zero), scheduler)
  128. );
  129. res.Messages.AssertEqual(
  130. OnNext(500, 0L),
  131. OnCompleted<long>(500)
  132. );
  133. }
  134. [TestMethod]
  135. public void OneShotTimer_DateTimeOffset_Zero()
  136. {
  137. var scheduler = new TestScheduler();
  138. var res = scheduler.Start(() =>
  139. Observable.Timer(new DateTimeOffset(200, TimeSpan.Zero), scheduler)
  140. );
  141. res.Messages.AssertEqual(
  142. OnNext(201, 0L),
  143. OnCompleted<long>(201)
  144. );
  145. }
  146. [TestMethod]
  147. public void OneShotTimer_DateTimeOffset_Past()
  148. {
  149. var scheduler = new TestScheduler();
  150. var res = scheduler.Start(() =>
  151. Observable.Timer(new DateTimeOffset(0, TimeSpan.Zero), scheduler)
  152. );
  153. res.Messages.AssertEqual(
  154. OnNext(201, 0L),
  155. OnCompleted<long>(201)
  156. );
  157. }
  158. [TestMethod]
  159. public void RepeatingTimer_TimeSpan_Zero_DefaultScheduler()
  160. {
  161. var scheduler = new TestScheduler();
  162. var observer = scheduler.CreateObserver<long>();
  163. var completed = new ManualResetEvent(false);
  164. Observable.Timer(TimeSpan.Zero, TimeSpan.Zero).TakeWhile(i => i < 10).Subscribe(observer.OnNext, () => completed.Set());
  165. completed.WaitOne();
  166. Assert.Equal(10, observer.Messages.Count);
  167. }
  168. [TestMethod]
  169. public void RepeatingTimer_DateTimeOffset_TimeSpan_Simple()
  170. {
  171. var scheduler = new TestScheduler();
  172. var res = scheduler.Start(() =>
  173. Observable.Timer(new DateTimeOffset(300, TimeSpan.Zero), TimeSpan.FromTicks(100), scheduler),
  174. 0, 200, 750
  175. );
  176. res.Messages.AssertEqual(
  177. OnNext(300, 0L),
  178. OnNext(400, 1L),
  179. OnNext(500, 2L),
  180. OnNext(600, 3L),
  181. OnNext(700, 4L)
  182. );
  183. }
  184. [TestMethod]
  185. public void RepeatingTimer_TimeSpan_TimeSpan_Simple()
  186. {
  187. var scheduler = new TestScheduler();
  188. var res = scheduler.Start(() =>
  189. Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler),
  190. 0, 200, 750
  191. );
  192. res.Messages.AssertEqual(
  193. OnNext(300, 0L),
  194. OnNext(400, 1L),
  195. OnNext(500, 2L),
  196. OnNext(600, 3L),
  197. OnNext(700, 4L)
  198. );
  199. }
  200. [TestMethod]
  201. public void RepeatingTimer_Periodic1()
  202. {
  203. var scheduler = new PeriodicTestScheduler();
  204. var res = scheduler.Start(() =>
  205. Observable.Timer(TimeSpan.FromTicks(50), TimeSpan.FromTicks(100), scheduler),
  206. 0, 200, 700
  207. );
  208. res.Messages.AssertEqual(
  209. OnNext(250, 0L),
  210. OnNext(350, 1L),
  211. OnNext(450, 2L),
  212. OnNext(550, 3L),
  213. OnNext(650, 4L)
  214. );
  215. #if !WINDOWS
  216. scheduler.Timers.AssertEqual(
  217. new TimerRun(250, 700) { 350, 450, 550, 650 }
  218. );
  219. #endif
  220. }
  221. [TestMethod]
  222. public void RepeatingTimer_Periodic2()
  223. {
  224. var scheduler = new PeriodicTestScheduler();
  225. var res = scheduler.Start(() =>
  226. Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler),
  227. 0, 200, 750
  228. );
  229. res.Messages.AssertEqual(
  230. OnNext(300, 0L),
  231. OnNext(400, 1L),
  232. OnNext(500, 2L),
  233. OnNext(600, 3L),
  234. OnNext(700, 4L)
  235. );
  236. #if !WINDOWS
  237. scheduler.Timers.AssertEqual(
  238. new TimerRun(200, 750) { 300, 400, 500, 600, 700 }
  239. );
  240. #endif
  241. }
  242. [TestMethod]
  243. public void RepeatingTimer_UsingStopwatch_Slippage1()
  244. {
  245. var scheduler = new TestScheduler();
  246. var xs = default(IObservable<long>);
  247. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  248. var times = new List<long>();
  249. var onNext = new Action<long>(x =>
  250. {
  251. times.Add(scheduler.Clock);
  252. if (x == 0)
  253. {
  254. return;
  255. }
  256. if (x < 2)
  257. {
  258. scheduler.Sleep(50);
  259. return;
  260. }
  261. if (x < 4)
  262. {
  263. scheduler.Sleep(120);
  264. return;
  265. }
  266. if (x < 6)
  267. {
  268. scheduler.Sleep(50);
  269. return;
  270. }
  271. if (x < 8)
  272. {
  273. return;
  274. }
  275. });
  276. var d = default(IDisposable);
  277. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  278. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  279. scheduler.Start();
  280. times.AssertEqual(
  281. 201, // 1 off because of initial scheduling jump (InvokeStart)
  282. 301,
  283. 401,
  284. 522, // 2 off because of 401 + 120 + 1 scheduling tick
  285. 643, // 3 off because of 522 + 120 + 1 scheduling tick
  286. 701,
  287. 801,
  288. 901
  289. );
  290. }
  291. [TestMethod]
  292. public void RepeatingTimer_UsingStopwatch_Slippage2()
  293. {
  294. var scheduler = new TestScheduler();
  295. var xs = default(IObservable<long>);
  296. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler); });
  297. var times = new List<long>();
  298. var onNext = new Action<long>(x =>
  299. {
  300. times.Add(scheduler.Clock);
  301. if (x == 0)
  302. {
  303. return;
  304. }
  305. if (x < 2)
  306. {
  307. scheduler.Sleep(50);
  308. return;
  309. }
  310. if (x < 4)
  311. {
  312. scheduler.Sleep(120);
  313. return;
  314. }
  315. if (x < 6)
  316. {
  317. scheduler.Sleep(50);
  318. return;
  319. }
  320. if (x < 8)
  321. {
  322. return;
  323. }
  324. });
  325. var d = default(IDisposable);
  326. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  327. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  328. scheduler.Start();
  329. times.AssertEqual(
  330. 300,
  331. 400,
  332. 500,
  333. 621, // 1 off because of recursive scheduling beyond the target time
  334. 742, // 2 off because of 621 + 120 + 1 scheduling tick
  335. 800,
  336. 900
  337. );
  338. }
  339. [TestMethod]
  340. public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart()
  341. {
  342. var scheduler = new TestScheduler();
  343. var xs = default(IObservable<long>);
  344. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  345. var times = new List<long>();
  346. var onNext = new Action<long>(x =>
  347. {
  348. times.Add(scheduler.Clock);
  349. if (x == 0)
  350. {
  351. scheduler.Sleep(350);
  352. return;
  353. }
  354. });
  355. var d = default(IDisposable);
  356. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  357. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  358. scheduler.Start();
  359. times.AssertEqual(
  360. 201, // 1 off because of initial scheduling jump (InvokeStart)
  361. 551, // catching up after excessive delay of 350 (target was 300)
  362. 552, // catching up after excessive delay of 350 (target was 400)
  363. 553, // catching up after excessive delay of 350 (target was 500)
  364. 601, // back in sync
  365. 701,
  366. 801,
  367. 901
  368. );
  369. }
  370. [TestMethod]
  371. public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart_ThrowsFirst()
  372. {
  373. var ex = new Exception();
  374. var scheduler = new TestScheduler();
  375. var xs = default(IObservable<long>);
  376. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  377. var onNext = new Action<long>(x =>
  378. {
  379. if (x == 0)
  380. {
  381. throw ex;
  382. }
  383. });
  384. var d = default(IDisposable);
  385. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  386. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  387. try
  388. {
  389. scheduler.Start();
  390. }
  391. catch (Exception e)
  392. {
  393. Assert.Equal(201, scheduler.Clock);
  394. Assert.Same(ex, e);
  395. }
  396. }
  397. [TestMethod]
  398. public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart_ThrowsBeyondFirst()
  399. {
  400. var ex = new Exception();
  401. var scheduler = new TestScheduler();
  402. var xs = default(IObservable<long>);
  403. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  404. var times = new List<long>();
  405. var onNext = new Action<long>(x =>
  406. {
  407. times.Add(scheduler.Clock);
  408. if (x == 0)
  409. {
  410. scheduler.Sleep(350);
  411. return;
  412. }
  413. if (x == 5)
  414. {
  415. throw ex;
  416. }
  417. });
  418. var d = default(IDisposable);
  419. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  420. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  421. try
  422. {
  423. scheduler.Start();
  424. }
  425. catch (Exception e)
  426. {
  427. Assert.Equal(701, scheduler.Clock);
  428. Assert.Same(ex, e);
  429. }
  430. times.AssertEqual(
  431. 201, // 1 off because of initial scheduling jump (InvokeStart)
  432. 551, // catching up after excessive delay of 350 (target was 300)
  433. 552, // catching up after excessive delay of 350 (target was 400)
  434. 553, // catching up after excessive delay of 350 (target was 500)
  435. 601, // back in sync
  436. 701
  437. );
  438. }
  439. [TestMethod]
  440. public void RepeatingTimer_NoStopwatch_Slippage1()
  441. {
  442. var scheduler = new TestScheduler();
  443. var xs = default(IObservable<long>);
  444. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler.DisableOptimizations(typeof(IStopwatchProvider))); });
  445. var times = new List<long>();
  446. var onNext = new Action<long>(x =>
  447. {
  448. times.Add(scheduler.Clock);
  449. if (x == 0)
  450. {
  451. return;
  452. }
  453. if (x < 2)
  454. {
  455. scheduler.Sleep(50);
  456. return;
  457. }
  458. if (x < 4)
  459. {
  460. scheduler.Sleep(120);
  461. return;
  462. }
  463. if (x < 6)
  464. {
  465. scheduler.Sleep(50);
  466. return;
  467. }
  468. if (x < 8)
  469. {
  470. return;
  471. }
  472. });
  473. var d = default(IDisposable);
  474. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  475. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  476. scheduler.Start();
  477. times.AssertEqual(
  478. 201, // 1 off because of initial scheduling jump (InvokeStart)
  479. 301,
  480. 401,
  481. 523, // 3 off because of 401 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  482. 645, // 5 off because of 523 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  483. 743, // \
  484. 843, // +--> 43 off because this situation (no stopwatch or periodic scheduling interface) only gets best effort treatment (see SchedulePeriodic emulation code)
  485. 943 // /
  486. );
  487. }
  488. [TestMethod]
  489. public void RepeatingTimer_NoStopwatch_Slippage2()
  490. {
  491. var scheduler = new TestScheduler();
  492. var xs = default(IObservable<long>);
  493. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler.DisableOptimizations(typeof(IStopwatchProvider))); });
  494. var times = new List<long>();
  495. var onNext = new Action<long>(x =>
  496. {
  497. times.Add(scheduler.Clock);
  498. if (x == 0)
  499. {
  500. return;
  501. }
  502. if (x < 2)
  503. {
  504. scheduler.Sleep(50);
  505. return;
  506. }
  507. if (x < 4)
  508. {
  509. scheduler.Sleep(120);
  510. return;
  511. }
  512. if (x < 6)
  513. {
  514. scheduler.Sleep(50);
  515. return;
  516. }
  517. if (x < 8)
  518. {
  519. return;
  520. }
  521. });
  522. var d = default(IDisposable);
  523. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  524. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  525. scheduler.Start();
  526. times.AssertEqual(
  527. 300,
  528. 400,
  529. 500,
  530. 622, // 2 off because of 500 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  531. 744, // 4 off because of 622 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  532. 842, // |
  533. 942 // +--> 42 off because this situation (no stopwatch or periodic scheduling interface) only gets best effort treatment (see SchedulePeriodic emulation code)
  534. );
  535. }
  536. [TestMethod]
  537. public void RepeatingTimer_Start_CatchUp()
  538. {
  539. var e = new ManualResetEvent(false);
  540. var xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10));
  541. var d = new SingleAssignmentDisposable();
  542. d.Disposable = xs.Subscribe(x =>
  543. {
  544. if (x == 0)
  545. {
  546. Thread.Sleep(500);
  547. }
  548. if (x > 10)
  549. {
  550. e.Set();
  551. d.Dispose();
  552. }
  553. });
  554. e.WaitOne();
  555. }
  556. [TestMethod]
  557. public void RepeatingTimer_Start_CatchUp_Throws()
  558. {
  559. var end = new ManualResetEvent(false);
  560. var err = new Exception();
  561. var ex = default(Exception);
  562. var s = ThreadPoolScheduler.Instance.Catch<Exception>(e =>
  563. {
  564. Interlocked.Exchange(ref ex, e);
  565. end.Set();
  566. return true;
  567. });
  568. var xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10), s);
  569. xs.Subscribe(x =>
  570. {
  571. if (x == 0)
  572. {
  573. Thread.Sleep(500);
  574. }
  575. if (x == 5)
  576. {
  577. throw err;
  578. }
  579. });
  580. end.WaitOne();
  581. Assert.Same(err, ex);
  582. }
  583. }
  584. internal class SchedulerWithCatch : IServiceProvider, IScheduler
  585. {
  586. private readonly IScheduler _scheduler;
  587. private readonly Action<Exception> _setException;
  588. public SchedulerWithCatch(IScheduler scheduler, Action<Exception> setException)
  589. {
  590. _scheduler = scheduler;
  591. _setException = setException;
  592. }
  593. public object GetService(Type serviceType)
  594. {
  595. return ((IServiceProvider)_scheduler).GetService(serviceType);
  596. }
  597. public DateTimeOffset Now
  598. {
  599. get { return _scheduler.Now; }
  600. }
  601. public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  602. {
  603. return _scheduler.Schedule(state, GetCatch(action));
  604. }
  605. public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  606. {
  607. return _scheduler.Schedule(state, dueTime, GetCatch(action));
  608. }
  609. public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  610. {
  611. return _scheduler.Schedule(state, dueTime, GetCatch(action));
  612. }
  613. private Func<IScheduler, TState, IDisposable> GetCatch<TState>(Func<IScheduler, TState, IDisposable> action)
  614. {
  615. return (self, s) =>
  616. {
  617. try
  618. {
  619. return action(new SchedulerWithCatch(self, _setException), s);
  620. }
  621. catch (Exception ex)
  622. {
  623. _setException(ex);
  624. return Disposable.Empty;
  625. }
  626. };
  627. }
  628. }
  629. internal class PeriodicTestScheduler : TestScheduler, ISchedulerPeriodic, IServiceProvider
  630. {
  631. private readonly List<TimerRun> _timers = [];
  632. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  633. {
  634. var run = new TimerRun(Clock);
  635. _timers.Add(run);
  636. var x = state;
  637. var d = this.Schedule(period, self =>
  638. {
  639. run.Add(Clock);
  640. x = action(x);
  641. self(period);
  642. });
  643. return new CompositeDisposable(
  644. Disposable.Create(() => { run.Stop(Clock); }),
  645. d
  646. );
  647. }
  648. public List<TimerRun> Timers
  649. {
  650. get { return _timers; }
  651. }
  652. protected override object GetService(Type serviceType)
  653. {
  654. if (serviceType == typeof(ISchedulerPeriodic))
  655. {
  656. return this as ISchedulerPeriodic;
  657. }
  658. return base.GetService(serviceType);
  659. }
  660. }
  661. internal class TimerRun : IEnumerable<long>
  662. {
  663. private readonly long _started;
  664. private long _stopped;
  665. private bool _hasStopped;
  666. private readonly List<long> _ticks = [];
  667. public TimerRun(long started)
  668. {
  669. _started = started;
  670. }
  671. public TimerRun(long started, long stopped)
  672. {
  673. _started = started;
  674. _stopped = stopped;
  675. _hasStopped = true;
  676. }
  677. public override int GetHashCode()
  678. {
  679. return 0;
  680. }
  681. public override bool Equals(object obj)
  682. {
  683. if (obj is not TimerRun other)
  684. {
  685. return false;
  686. }
  687. return _started == other._started && _stopped == other._stopped && _ticks.SequenceEqual(other._ticks);
  688. }
  689. public long Started
  690. {
  691. get { return _started; }
  692. }
  693. public IEnumerable<long> Ticks
  694. {
  695. get { return _ticks; }
  696. }
  697. public long Stopped
  698. {
  699. get { return _stopped; }
  700. }
  701. internal void Stop(long clock)
  702. {
  703. _stopped = clock;
  704. _hasStopped = true;
  705. }
  706. public override string ToString()
  707. {
  708. var sb = new StringBuilder();
  709. sb.Append("Start(" + _started + ") ");
  710. sb.Append("Ticks(" + string.Join(", ", _ticks.Select(t => t.ToString()).ToArray()) + ") ");
  711. if (_hasStopped)
  712. {
  713. sb.Append("Stop(" + _stopped + ")");
  714. }
  715. return sb.ToString();
  716. }
  717. public IEnumerator<long> GetEnumerator()
  718. {
  719. return _ticks.GetEnumerator();
  720. }
  721. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  722. {
  723. return _ticks.GetEnumerator();
  724. }
  725. public void Add(long clock)
  726. {
  727. _ticks.Add(clock);
  728. }
  729. }
  730. }