TimerTest.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Disposables;
  9. using System.Reactive.Linq;
  10. using System.Text;
  11. using System.Threading;
  12. using Microsoft.Reactive.Testing;
  13. using ReactiveTests.Dummies;
  14. using Xunit;
  15. namespace ReactiveTests.Tests
  16. {
  17. public class TimerTest : ReactiveTest
  18. {
  19. [Fact]
  20. public void OneShotTimer_TimeSpan_ArgumentChecking()
  21. {
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(TimeSpan.Zero, null));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(TimeSpan.Zero, DummyScheduler.Instance).Subscribe(null));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(DateTimeOffset.Now, null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(TimeSpan.Zero, TimeSpan.Zero, null));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.Zero, null));
  27. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(-1)));
  28. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(-1), DummyScheduler.Instance));
  29. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(-1)));
  30. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(-1), DummyScheduler.Instance));
  31. }
  32. [Fact]
  33. public void OneShotTimer_TimeSpan_Basic()
  34. {
  35. var scheduler = new TestScheduler();
  36. var res = scheduler.Start(() =>
  37. Observable.Timer(TimeSpan.FromTicks(300), scheduler)
  38. );
  39. res.Messages.AssertEqual(
  40. OnNext(500, 0L),
  41. OnCompleted<long>(500)
  42. );
  43. }
  44. [Fact]
  45. public void OneShotTimer_TimeSpan_Zero()
  46. {
  47. var scheduler = new TestScheduler();
  48. var res = scheduler.Start(() =>
  49. Observable.Timer(TimeSpan.FromTicks(0), scheduler)
  50. );
  51. res.Messages.AssertEqual(
  52. OnNext(201, 0L),
  53. OnCompleted<long>(201)
  54. );
  55. }
  56. [Fact]
  57. public void OneShotTimer_TimeSpan_Zero_DefaultScheduler()
  58. {
  59. var scheduler = new TestScheduler();
  60. var observer = scheduler.CreateObserver<long>();
  61. var completed = new ManualResetEvent(false);
  62. Observable.Timer(TimeSpan.Zero).Subscribe(observer.OnNext, () => completed.Set());
  63. completed.WaitOne();
  64. Assert.Equal(1, observer.Messages.Count);
  65. }
  66. [Fact]
  67. public void OneShotTimer_TimeSpan_Negative()
  68. {
  69. var scheduler = new TestScheduler();
  70. var res = scheduler.Start(() =>
  71. Observable.Timer(TimeSpan.FromTicks(-1), scheduler)
  72. );
  73. res.Messages.AssertEqual(
  74. OnNext(201, 0L),
  75. OnCompleted<long>(201)
  76. );
  77. }
  78. [Fact]
  79. public void OneShotTimer_TimeSpan_Disposed()
  80. {
  81. var scheduler = new TestScheduler();
  82. var res = scheduler.Start(() =>
  83. Observable.Timer(TimeSpan.FromTicks(1000), scheduler)
  84. );
  85. res.Messages.AssertEqual(
  86. );
  87. }
  88. [Fact]
  89. public void OneShotTimer_TimeSpan_ObserverThrows()
  90. {
  91. var scheduler1 = new TestScheduler();
  92. var xs = Observable.Timer(TimeSpan.FromTicks(1), scheduler1);
  93. xs.Subscribe(x => { throw new InvalidOperationException(); });
  94. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  95. var scheduler2 = new TestScheduler();
  96. var ys = Observable.Timer(TimeSpan.FromTicks(1), scheduler2);
  97. ys.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  98. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  99. }
  100. [Fact]
  101. public void OneShotTimer_TimeSpan_DefaultScheduler()
  102. {
  103. Assert.True(Observable.Timer(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 0L }));
  104. }
  105. [Fact]
  106. public void OneShotTimer_DateTimeOffset_DefaultScheduler()
  107. {
  108. Assert.True(Observable.Timer(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(1)).ToEnumerable().SequenceEqual(new[] { 0L }));
  109. }
  110. [Fact]
  111. public void OneShotTimer_TimeSpan_TimeSpan_DefaultScheduler()
  112. {
  113. Assert.True(Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(2).SequenceEqual(new[] { 0L, 1L }));
  114. }
  115. [Fact]
  116. public void OneShotTimer_DateTimeOffset_TimeSpan_DefaultScheduler()
  117. {
  118. Assert.True(Observable.Timer(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(2).SequenceEqual(new[] { 0L, 1L }));
  119. }
  120. [Fact]
  121. public void OneShotTimer_DateTimeOffset_Basic()
  122. {
  123. var scheduler = new TestScheduler();
  124. var res = scheduler.Start(() =>
  125. Observable.Timer(new DateTimeOffset(500, TimeSpan.Zero), scheduler)
  126. );
  127. res.Messages.AssertEqual(
  128. OnNext(500, 0L),
  129. OnCompleted<long>(500)
  130. );
  131. }
  132. [Fact]
  133. public void OneShotTimer_DateTimeOffset_Zero()
  134. {
  135. var scheduler = new TestScheduler();
  136. var res = scheduler.Start(() =>
  137. Observable.Timer(new DateTimeOffset(200, TimeSpan.Zero), scheduler)
  138. );
  139. res.Messages.AssertEqual(
  140. OnNext(201, 0L),
  141. OnCompleted<long>(201)
  142. );
  143. }
  144. [Fact]
  145. public void OneShotTimer_DateTimeOffset_Past()
  146. {
  147. var scheduler = new TestScheduler();
  148. var res = scheduler.Start(() =>
  149. Observable.Timer(new DateTimeOffset(0, TimeSpan.Zero), scheduler)
  150. );
  151. res.Messages.AssertEqual(
  152. OnNext(201, 0L),
  153. OnCompleted<long>(201)
  154. );
  155. }
  156. [Fact]
  157. public void RepeatingTimer_TimeSpan_Zero_DefaultScheduler()
  158. {
  159. var scheduler = new TestScheduler();
  160. var observer = scheduler.CreateObserver<long>();
  161. var completed = new ManualResetEvent(false);
  162. Observable.Timer(TimeSpan.Zero, TimeSpan.Zero).TakeWhile(i => i < 10).Subscribe(observer.OnNext, () => completed.Set());
  163. completed.WaitOne();
  164. Assert.Equal(10, observer.Messages.Count);
  165. }
  166. [Fact]
  167. public void RepeatingTimer_DateTimeOffset_TimeSpan_Simple()
  168. {
  169. var scheduler = new TestScheduler();
  170. var res = scheduler.Start(() =>
  171. Observable.Timer(new DateTimeOffset(300, TimeSpan.Zero), TimeSpan.FromTicks(100), scheduler),
  172. 0, 200, 750
  173. );
  174. res.Messages.AssertEqual(
  175. OnNext(300, 0L),
  176. OnNext(400, 1L),
  177. OnNext(500, 2L),
  178. OnNext(600, 3L),
  179. OnNext(700, 4L)
  180. );
  181. }
  182. [Fact]
  183. public void RepeatingTimer_TimeSpan_TimeSpan_Simple()
  184. {
  185. var scheduler = new TestScheduler();
  186. var res = scheduler.Start(() =>
  187. Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler),
  188. 0, 200, 750
  189. );
  190. res.Messages.AssertEqual(
  191. OnNext(300, 0L),
  192. OnNext(400, 1L),
  193. OnNext(500, 2L),
  194. OnNext(600, 3L),
  195. OnNext(700, 4L)
  196. );
  197. }
  198. [Fact]
  199. public void RepeatingTimer_Periodic1()
  200. {
  201. var scheduler = new PeriodicTestScheduler();
  202. var res = scheduler.Start(() =>
  203. Observable.Timer(TimeSpan.FromTicks(50), TimeSpan.FromTicks(100), scheduler),
  204. 0, 200, 700
  205. );
  206. res.Messages.AssertEqual(
  207. OnNext(250, 0L),
  208. OnNext(350, 1L),
  209. OnNext(450, 2L),
  210. OnNext(550, 3L),
  211. OnNext(650, 4L)
  212. );
  213. #if !WINDOWS
  214. scheduler.Timers.AssertEqual(
  215. new TimerRun(250, 700) { 350, 450, 550, 650 }
  216. );
  217. #endif
  218. }
  219. [Fact]
  220. public void RepeatingTimer_Periodic2()
  221. {
  222. var scheduler = new PeriodicTestScheduler();
  223. var res = scheduler.Start(() =>
  224. Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler),
  225. 0, 200, 750
  226. );
  227. res.Messages.AssertEqual(
  228. OnNext(300, 0L),
  229. OnNext(400, 1L),
  230. OnNext(500, 2L),
  231. OnNext(600, 3L),
  232. OnNext(700, 4L)
  233. );
  234. #if !WINDOWS
  235. scheduler.Timers.AssertEqual(
  236. new TimerRun(200, 750) { 300, 400, 500, 600, 700 }
  237. );
  238. #endif
  239. }
  240. [Fact]
  241. public void RepeatingTimer_UsingStopwatch_Slippage1()
  242. {
  243. var scheduler = new TestScheduler();
  244. var xs = default(IObservable<long>);
  245. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  246. var times = new List<long>();
  247. var onNext = new Action<long>(x =>
  248. {
  249. times.Add(scheduler.Clock);
  250. if (x == 0)
  251. {
  252. return;
  253. }
  254. if (x < 2)
  255. {
  256. scheduler.Sleep(50);
  257. return;
  258. }
  259. if (x < 4)
  260. {
  261. scheduler.Sleep(120);
  262. return;
  263. }
  264. if (x < 6)
  265. {
  266. scheduler.Sleep(50);
  267. return;
  268. }
  269. if (x < 8)
  270. {
  271. return;
  272. }
  273. });
  274. var d = default(IDisposable);
  275. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  276. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  277. scheduler.Start();
  278. times.AssertEqual(
  279. 201, // 1 off because of initial scheduling jump (InvokeStart)
  280. 301,
  281. 401,
  282. 522, // 2 off because of 401 + 120 + 1 scheduling tick
  283. 643, // 3 off because of 522 + 120 + 1 scheduling tick
  284. 701,
  285. 801,
  286. 901
  287. );
  288. }
  289. [Fact]
  290. public void RepeatingTimer_UsingStopwatch_Slippage2()
  291. {
  292. var scheduler = new TestScheduler();
  293. var xs = default(IObservable<long>);
  294. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler); });
  295. var times = new List<long>();
  296. var onNext = new Action<long>(x =>
  297. {
  298. times.Add(scheduler.Clock);
  299. if (x == 0)
  300. {
  301. return;
  302. }
  303. if (x < 2)
  304. {
  305. scheduler.Sleep(50);
  306. return;
  307. }
  308. if (x < 4)
  309. {
  310. scheduler.Sleep(120);
  311. return;
  312. }
  313. if (x < 6)
  314. {
  315. scheduler.Sleep(50);
  316. return;
  317. }
  318. if (x < 8)
  319. {
  320. return;
  321. }
  322. });
  323. var d = default(IDisposable);
  324. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  325. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  326. scheduler.Start();
  327. times.AssertEqual(
  328. 300,
  329. 400,
  330. 500,
  331. 621, // 1 off because of recursive scheduling beyond the target time
  332. 742, // 2 off because of 621 + 120 + 1 scheduling tick
  333. 800,
  334. 900
  335. );
  336. }
  337. [Fact]
  338. public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart()
  339. {
  340. var scheduler = new TestScheduler();
  341. var xs = default(IObservable<long>);
  342. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  343. var times = new List<long>();
  344. var onNext = new Action<long>(x =>
  345. {
  346. times.Add(scheduler.Clock);
  347. if (x == 0)
  348. {
  349. scheduler.Sleep(350);
  350. return;
  351. }
  352. });
  353. var d = default(IDisposable);
  354. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  355. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  356. scheduler.Start();
  357. times.AssertEqual(
  358. 201, // 1 off because of initial scheduling jump (InvokeStart)
  359. 551, // catching up after excessive delay of 350 (target was 300)
  360. 552, // catching up after excessive delay of 350 (target was 400)
  361. 553, // catching up after excessive delay of 350 (target was 500)
  362. 601, // back in sync
  363. 701,
  364. 801,
  365. 901
  366. );
  367. }
  368. [Fact]
  369. public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart_ThrowsFirst()
  370. {
  371. var ex = new Exception();
  372. var scheduler = new TestScheduler();
  373. var xs = default(IObservable<long>);
  374. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  375. var onNext = new Action<long>(x =>
  376. {
  377. if (x == 0)
  378. {
  379. throw ex;
  380. }
  381. });
  382. var d = default(IDisposable);
  383. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  384. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  385. try
  386. {
  387. scheduler.Start();
  388. }
  389. catch (Exception e)
  390. {
  391. Assert.Equal(201, scheduler.Clock);
  392. Assert.Same(ex, e);
  393. }
  394. }
  395. [Fact]
  396. public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart_ThrowsBeyondFirst()
  397. {
  398. var ex = new Exception();
  399. var scheduler = new TestScheduler();
  400. var xs = default(IObservable<long>);
  401. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  402. var times = new List<long>();
  403. var onNext = new Action<long>(x =>
  404. {
  405. times.Add(scheduler.Clock);
  406. if (x == 0)
  407. {
  408. scheduler.Sleep(350);
  409. return;
  410. }
  411. if (x == 5)
  412. {
  413. throw ex;
  414. }
  415. });
  416. var d = default(IDisposable);
  417. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  418. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  419. try
  420. {
  421. scheduler.Start();
  422. }
  423. catch (Exception e)
  424. {
  425. Assert.Equal(701, scheduler.Clock);
  426. Assert.Same(ex, e);
  427. }
  428. times.AssertEqual(
  429. 201, // 1 off because of initial scheduling jump (InvokeStart)
  430. 551, // catching up after excessive delay of 350 (target was 300)
  431. 552, // catching up after excessive delay of 350 (target was 400)
  432. 553, // catching up after excessive delay of 350 (target was 500)
  433. 601, // back in sync
  434. 701
  435. );
  436. }
  437. [Fact]
  438. public void RepeatingTimer_NoStopwatch_Slippage1()
  439. {
  440. var scheduler = new TestScheduler();
  441. var xs = default(IObservable<long>);
  442. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler.DisableOptimizations(typeof(IStopwatchProvider))); });
  443. var times = new List<long>();
  444. var onNext = new Action<long>(x =>
  445. {
  446. times.Add(scheduler.Clock);
  447. if (x == 0)
  448. {
  449. return;
  450. }
  451. if (x < 2)
  452. {
  453. scheduler.Sleep(50);
  454. return;
  455. }
  456. if (x < 4)
  457. {
  458. scheduler.Sleep(120);
  459. return;
  460. }
  461. if (x < 6)
  462. {
  463. scheduler.Sleep(50);
  464. return;
  465. }
  466. if (x < 8)
  467. {
  468. return;
  469. }
  470. });
  471. var d = default(IDisposable);
  472. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  473. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  474. scheduler.Start();
  475. times.AssertEqual(
  476. 201, // 1 off because of initial scheduling jump (InvokeStart)
  477. 301,
  478. 401,
  479. 523, // 3 off because of 401 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  480. 645, // 5 off because of 523 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  481. 743, // \
  482. 843, // +--> 43 off because this situation (no stopwatch or periodic scheduling interface) only gets best effort treatment (see SchedulePeriodic emulation code)
  483. 943 // /
  484. );
  485. }
  486. [Fact]
  487. public void RepeatingTimer_NoStopwatch_Slippage2()
  488. {
  489. var scheduler = new TestScheduler();
  490. var xs = default(IObservable<long>);
  491. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler.DisableOptimizations(typeof(IStopwatchProvider))); });
  492. var times = new List<long>();
  493. var onNext = new Action<long>(x =>
  494. {
  495. times.Add(scheduler.Clock);
  496. if (x == 0)
  497. {
  498. return;
  499. }
  500. if (x < 2)
  501. {
  502. scheduler.Sleep(50);
  503. return;
  504. }
  505. if (x < 4)
  506. {
  507. scheduler.Sleep(120);
  508. return;
  509. }
  510. if (x < 6)
  511. {
  512. scheduler.Sleep(50);
  513. return;
  514. }
  515. if (x < 8)
  516. {
  517. return;
  518. }
  519. });
  520. var d = default(IDisposable);
  521. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  522. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  523. scheduler.Start();
  524. times.AssertEqual(
  525. 300,
  526. 400,
  527. 500,
  528. 622, // 2 off because of 500 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  529. 744, // 4 off because of 622 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  530. 842, // |
  531. 942 // +--> 42 off because this situation (no stopwatch or periodic scheduling interface) only gets best effort treatment (see SchedulePeriodic emulation code)
  532. );
  533. }
  534. #if !NO_THREAD
  535. [Fact]
  536. public void RepeatingTimer_Start_CatchUp()
  537. {
  538. var e = new ManualResetEvent(false);
  539. var xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10));
  540. var d = new SingleAssignmentDisposable();
  541. d.Disposable = xs.Subscribe(x =>
  542. {
  543. if (x == 0)
  544. {
  545. Thread.Sleep(500);
  546. }
  547. if (x > 10)
  548. {
  549. e.Set();
  550. d.Dispose();
  551. }
  552. });
  553. e.WaitOne();
  554. }
  555. [Fact]
  556. public void RepeatingTimer_Start_CatchUp_Throws()
  557. {
  558. var end = new ManualResetEvent(false);
  559. var err = new Exception();
  560. var ex = default(Exception);
  561. var s = ThreadPoolScheduler.Instance.Catch<Exception>(e =>
  562. {
  563. Interlocked.Exchange(ref ex, e);
  564. end.Set();
  565. return true;
  566. });
  567. var xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10), s);
  568. xs.Subscribe(x =>
  569. {
  570. if (x == 0)
  571. {
  572. Thread.Sleep(500);
  573. }
  574. if (x == 5)
  575. {
  576. throw err;
  577. }
  578. });
  579. end.WaitOne();
  580. Assert.Same(err, ex);
  581. }
  582. #endif
  583. }
  584. internal class SchedulerWithCatch : IServiceProvider, IScheduler
  585. {
  586. private readonly IScheduler _scheduler;
  587. private readonly Action<Exception> _setException;
  588. public SchedulerWithCatch(IScheduler scheduler, Action<Exception> setException)
  589. {
  590. _scheduler = scheduler;
  591. _setException = setException;
  592. }
  593. public object GetService(Type serviceType)
  594. {
  595. return ((IServiceProvider)_scheduler).GetService(serviceType);
  596. }
  597. public DateTimeOffset Now
  598. {
  599. get { return _scheduler.Now; }
  600. }
  601. public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  602. {
  603. return _scheduler.Schedule(state, GetCatch(action));
  604. }
  605. public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  606. {
  607. return _scheduler.Schedule(state, dueTime, GetCatch(action));
  608. }
  609. public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  610. {
  611. return _scheduler.Schedule(state, dueTime, GetCatch(action));
  612. }
  613. private Func<IScheduler, TState, IDisposable> GetCatch<TState>(Func<IScheduler, TState, IDisposable> action)
  614. {
  615. return (self, s) =>
  616. {
  617. try
  618. {
  619. return action(new SchedulerWithCatch(self, _setException), s);
  620. }
  621. catch (Exception ex)
  622. {
  623. _setException(ex);
  624. return Disposable.Empty;
  625. }
  626. };
  627. }
  628. }
  629. internal class PeriodicTestScheduler : TestScheduler, ISchedulerPeriodic, IServiceProvider
  630. {
  631. private readonly List<TimerRun> _timers;
  632. public PeriodicTestScheduler()
  633. {
  634. _timers = new List<TimerRun>();
  635. }
  636. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  637. {
  638. var run = new TimerRun(Clock);
  639. _timers.Add(run);
  640. var x = state;
  641. var d = this.Schedule(period, self =>
  642. {
  643. run.Add(Clock);
  644. x = action(x);
  645. self(period);
  646. });
  647. return new CompositeDisposable(
  648. Disposable.Create(() => { run.Stop(Clock); }),
  649. d
  650. );
  651. }
  652. public List<TimerRun> Timers
  653. {
  654. get { return _timers; }
  655. }
  656. protected override object GetService(Type serviceType)
  657. {
  658. if (serviceType == typeof(ISchedulerPeriodic))
  659. {
  660. return this as ISchedulerPeriodic;
  661. }
  662. return base.GetService(serviceType);
  663. }
  664. }
  665. internal class TimerRun : IEnumerable<long>
  666. {
  667. private readonly long _started;
  668. private long _stopped;
  669. private bool _hasStopped;
  670. private readonly List<long> _ticks;
  671. public TimerRun(long started)
  672. {
  673. _started = started;
  674. _ticks = new List<long>();
  675. }
  676. public TimerRun(long started, long stopped)
  677. {
  678. _started = started;
  679. _stopped = stopped;
  680. _hasStopped = true;
  681. _ticks = new List<long>();
  682. }
  683. public override int GetHashCode()
  684. {
  685. return 0;
  686. }
  687. public override bool Equals(object obj)
  688. {
  689. if (!(obj is TimerRun other))
  690. {
  691. return false;
  692. }
  693. return _started == other._started && _stopped == other._stopped && _ticks.SequenceEqual(other._ticks);
  694. }
  695. public long Started
  696. {
  697. get { return _started; }
  698. }
  699. public IEnumerable<long> Ticks
  700. {
  701. get { return _ticks; }
  702. }
  703. public long Stopped
  704. {
  705. get { return _stopped; }
  706. }
  707. internal void Stop(long clock)
  708. {
  709. _stopped = clock;
  710. _hasStopped = true;
  711. }
  712. public override string ToString()
  713. {
  714. var sb = new StringBuilder();
  715. sb.Append("Start(" + _started + ") ");
  716. sb.Append("Ticks(" + string.Join(", ", _ticks.Select(t => t.ToString()).ToArray()) + ") ");
  717. if (_hasStopped)
  718. {
  719. sb.Append("Stop(" + _stopped + ")");
  720. }
  721. return sb.ToString();
  722. }
  723. public IEnumerator<long> GetEnumerator()
  724. {
  725. return _ticks.GetEnumerator();
  726. }
  727. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  728. {
  729. return _ticks.GetEnumerator();
  730. }
  731. public void Add(long clock)
  732. {
  733. _ticks.Add(clock);
  734. }
  735. }
  736. }