ObserveOnTest.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Disposables;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using System.Threading;
  12. using Microsoft.Reactive.Testing;
  13. using ReactiveTests.Dummies;
  14. using Xunit;
  15. #if HAS_DISPATCHER
  16. using System.Windows.Threading;
  17. #endif
  18. #if HAS_WINFORMS
  19. using System.Windows.Forms;
  20. #endif
  21. namespace ReactiveTests.Tests
  22. {
  23. public class ObserveOnTest : TestBase
  24. {
  25. #region + TestBase +
  26. [Fact]
  27. public void ObserveOn_ArgumentChecking()
  28. {
  29. var someObservable = Observable.Empty<int>();
  30. #if HAS_WINFORMS
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new ControlScheduler(new Label())));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(ControlScheduler)));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.ObserveOn<int>(default(IObservable<int>), new Label()));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.ObserveOn<int>(someObservable, default(Label)));
  35. #endif
  36. #if HAS_DISPATCHER
  37. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new DispatcherScheduler(Dispatcher.CurrentDispatcher)));
  38. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(DispatcherScheduler)));
  39. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(default(IObservable<int>), Dispatcher.CurrentDispatcher));
  40. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(someObservable, default(Dispatcher)));
  41. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOnDispatcher<int>(default(IObservable<int>)));
  42. #endif
  43. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default, new SynchronizationContext()));
  44. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn(someObservable, default(SynchronizationContext)));
  45. }
  46. #if HAS_WINFORMS
  47. [Fact]
  48. public void ObserveOn_Control()
  49. {
  50. bool okay = true;
  51. using (WinFormsTestUtils.RunTest(out var lbl))
  52. {
  53. var evt = new ManualResetEvent(false);
  54. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(lbl).Subscribe(x =>
  55. {
  56. lbl.Text = x.ToString();
  57. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  58. }, () => evt.Set());
  59. evt.WaitOne();
  60. }
  61. Assert.True(okay);
  62. }
  63. [Fact]
  64. public void ObserveOn_ControlScheduler()
  65. {
  66. bool okay = true;
  67. using (WinFormsTestUtils.RunTest(out var lbl))
  68. {
  69. var evt = new ManualResetEvent(false);
  70. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(new ControlScheduler(lbl)).Subscribe(x =>
  71. {
  72. lbl.Text = x.ToString();
  73. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  74. }, () => evt.Set());
  75. evt.WaitOne();
  76. }
  77. Assert.True(okay);
  78. }
  79. #endif
  80. #if HAS_DISPATCHER
  81. [Fact]
  82. [Asynchronous]
  83. public void ObserveOn_Dispatcher()
  84. {
  85. using (DispatcherHelpers.RunTest(out var dispatcher))
  86. {
  87. RunAsync(evt =>
  88. {
  89. bool okay = true;
  90. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(dispatcher).Subscribe(x =>
  91. {
  92. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  93. }, () =>
  94. {
  95. Assert.True(okay);
  96. evt.Set();
  97. });
  98. });
  99. }
  100. }
  101. [Fact]
  102. [Asynchronous]
  103. public void ObserveOn_DispatcherScheduler()
  104. {
  105. using (DispatcherHelpers.RunTest(out var dispatcher))
  106. {
  107. RunAsync(evt =>
  108. {
  109. bool okay = true;
  110. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(new DispatcherScheduler(dispatcher)).Subscribe(x =>
  111. {
  112. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  113. }, () =>
  114. {
  115. Assert.True(okay);
  116. evt.Set();
  117. });
  118. });
  119. }
  120. }
  121. [Fact]
  122. [Asynchronous]
  123. public void ObserveOn_CurrentDispatcher()
  124. {
  125. using (DispatcherHelpers.RunTest(out var dispatcher))
  126. {
  127. RunAsync(evt =>
  128. {
  129. bool okay = true;
  130. dispatcher.BeginInvoke(new Action(() =>
  131. {
  132. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOnDispatcher().Subscribe(x =>
  133. {
  134. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  135. }, () =>
  136. {
  137. Assert.True(okay);
  138. evt.Set();
  139. });
  140. }));
  141. });
  142. }
  143. }
  144. [Fact]
  145. [Asynchronous]
  146. public void ObserveOn_Error()
  147. {
  148. using (DispatcherHelpers.RunTest(out var dispatcher))
  149. {
  150. RunAsync(evt =>
  151. {
  152. var ex = new Exception();
  153. bool okay = true;
  154. dispatcher.BeginInvoke(new Action(() =>
  155. {
  156. Observable.Throw<int>(ex).ObserveOnDispatcher().Subscribe(x =>
  157. {
  158. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  159. },
  160. e =>
  161. {
  162. Assert.True(okay);
  163. Assert.Same(ex, e);
  164. evt.Set();
  165. },
  166. () =>
  167. {
  168. Assert.True(false);
  169. evt.Set();
  170. });
  171. }));
  172. });
  173. }
  174. }
  175. #endif
  176. #endregion + TestBase +
  177. }
  178. public class ObserveOnReactiveTest : ReactiveTest
  179. {
  180. private static TimeSpan MaxWaitTime = TimeSpan.FromSeconds(10);
  181. [Fact]
  182. public void ObserveOn_Scheduler_ArgumentChecking()
  183. {
  184. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn(default(IObservable<int>), DummyScheduler.Instance));
  185. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn(DummyObservable<int>.Instance, default(IScheduler)));
  186. }
  187. [Fact]
  188. public void ObserveOn_Scheduler_Completed()
  189. {
  190. var scheduler = new TestScheduler();
  191. var xs = scheduler.CreateHotObservable(
  192. OnNext(90, 1),
  193. OnNext(120, 2),
  194. OnNext(230, 3),
  195. OnNext(240, 4),
  196. OnNext(310, 5),
  197. OnNext(470, 6),
  198. OnCompleted<int>(530)
  199. );
  200. var results = scheduler.Start(() =>
  201. xs.ObserveOn(scheduler)
  202. );
  203. results.Messages.AssertEqual(
  204. OnNext(231, 3),
  205. OnNext(241, 4),
  206. OnNext(311, 5),
  207. OnNext(471, 6),
  208. OnCompleted<int>(531)
  209. );
  210. #if !NO_PERF
  211. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  212. xs.Subscriptions.AssertEqual(
  213. Subscribe(200, 531)
  214. );
  215. #else
  216. //
  217. // TODO: Check platform discrepancies
  218. //
  219. //xs.Subscriptions.AssertEqual(
  220. // Subscribe(200, 1000)
  221. //);
  222. #endif
  223. }
  224. [Fact]
  225. public void ObserveOn_Scheduler_Error()
  226. {
  227. var scheduler = new TestScheduler();
  228. var ex = new Exception();
  229. var xs = scheduler.CreateHotObservable(
  230. OnNext(90, 1),
  231. OnNext(120, 2),
  232. OnNext(230, 3),
  233. OnNext(240, 4),
  234. OnNext(310, 5),
  235. OnNext(470, 6),
  236. OnError<int>(530, ex)
  237. );
  238. var results = scheduler.Start(() =>
  239. xs.ObserveOn(scheduler)
  240. );
  241. results.Messages.AssertEqual(
  242. OnNext(231, 3),
  243. OnNext(241, 4),
  244. OnNext(311, 5),
  245. OnNext(471, 6),
  246. OnError<int>(531, ex)
  247. );
  248. #if !NO_PERF
  249. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  250. xs.Subscriptions.AssertEqual(
  251. Subscribe(200, 531)
  252. );
  253. #else
  254. //
  255. // TODO: Check platform discrepancies
  256. //
  257. //xs.Subscriptions.AssertEqual(
  258. // Subscribe(200, 1000)
  259. //);
  260. #endif
  261. }
  262. [Fact]
  263. public void ObserveOn_Scheduler_Dispose()
  264. {
  265. var scheduler = new TestScheduler();
  266. var xs = scheduler.CreateHotObservable(
  267. OnNext(90, 1),
  268. OnNext(120, 2),
  269. OnNext(230, 3),
  270. OnNext(240, 4),
  271. OnNext(310, 5),
  272. OnNext(470, 6)
  273. );
  274. var results = scheduler.Start(() =>
  275. xs.ObserveOn(scheduler)
  276. );
  277. results.Messages.AssertEqual(
  278. OnNext(231, 3),
  279. OnNext(241, 4),
  280. OnNext(311, 5),
  281. OnNext(471, 6)
  282. );
  283. xs.Subscriptions.AssertEqual(
  284. Subscribe(200, 1000)
  285. );
  286. }
  287. [Fact]
  288. public void ObserveOn_Scheduler_SameTime()
  289. {
  290. var scheduler = new TestScheduler();
  291. var xs = scheduler.CreateHotObservable(
  292. OnNext(210, 1),
  293. OnNext(210, 2)
  294. );
  295. var results = scheduler.Start(() =>
  296. xs.ObserveOn(scheduler)
  297. );
  298. results.Messages.AssertEqual(
  299. OnNext(211, 1),
  300. OnNext(212, 2)
  301. );
  302. xs.Subscriptions.AssertEqual(
  303. Subscribe(200, 1000)
  304. );
  305. }
  306. [Fact]
  307. public void ObserveOn_Scheduler_OnNextThrows()
  308. {
  309. var e = new ManualResetEvent(false);
  310. var scheduler = new MyScheduler(e);
  311. Observable.Range(0, 10, Scheduler.Default).ObserveOn(scheduler).Subscribe(
  312. x =>
  313. {
  314. if (x == 5)
  315. {
  316. throw new Exception();
  317. }
  318. }
  319. );
  320. e.WaitOne();
  321. Assert.NotNull(scheduler._exception);
  322. }
  323. private class MyScheduler : IScheduler
  324. {
  325. internal Exception _exception;
  326. private ManualResetEvent _evt;
  327. public MyScheduler(ManualResetEvent e)
  328. {
  329. _evt = e;
  330. }
  331. public DateTimeOffset Now
  332. {
  333. get { throw new NotImplementedException(); }
  334. }
  335. public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  336. {
  337. try
  338. {
  339. return action(this, state);
  340. }
  341. catch (Exception ex)
  342. {
  343. _exception = ex;
  344. _evt.Set();
  345. return Disposable.Empty;
  346. }
  347. }
  348. public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  349. {
  350. throw new NotImplementedException();
  351. }
  352. public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  353. {
  354. throw new NotImplementedException();
  355. }
  356. }
  357. #if !NO_PERF
  358. [Fact]
  359. public void ObserveOn_LongRunning_Simple()
  360. {
  361. var started = default(ManualResetEvent);
  362. var stopped = default(ManualResetEvent);
  363. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  364. var s = new Subject<int>();
  365. var end = new ManualResetEvent(false);
  366. var lst = new List<int>();
  367. s.ObserveOn(scheduler).Subscribe(lst.Add, () => end.Set());
  368. s.OnNext(1);
  369. s.OnNext(2);
  370. s.OnNext(3);
  371. s.OnCompleted();
  372. end.WaitOne();
  373. Assert.True(lst.SequenceEqual(new[] { 1, 2, 3 }));
  374. }
  375. [Fact]
  376. public void ObserveOn_LongRunning_Error()
  377. {
  378. var started = default(ManualResetEvent);
  379. var stopped = default(ManualResetEvent);
  380. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  381. var s = new Subject<int>();
  382. var end = new ManualResetEvent(false);
  383. var err = default(Exception);
  384. s.ObserveOn(scheduler).Subscribe(_ => { }, ex => { err = ex; end.Set(); });
  385. s.OnNext(1);
  386. s.OnNext(2);
  387. s.OnNext(3);
  388. var ex_ = new Exception();
  389. s.OnError(ex_);
  390. end.WaitOne();
  391. Assert.Same(ex_, err);
  392. }
  393. #if !NO_THREAD
  394. [Fact]
  395. public void ObserveOn_LongRunning_TimeVariance()
  396. {
  397. var started = default(ManualResetEvent);
  398. var stopped = default(ManualResetEvent);
  399. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  400. var s = new Subject<int>();
  401. var end = new ManualResetEvent(false);
  402. s.ObserveOn(scheduler).Subscribe(_ => { }, () => end.Set());
  403. s.OnNext(1); // Ensure active
  404. started.WaitOne();
  405. Thread.Sleep(100); // Try to enter the dispatcher event wait state
  406. for (var i = 0; i < 1000; i++)
  407. {
  408. if (i % 100 == 0)
  409. {
  410. Thread.Sleep(10);
  411. }
  412. s.OnNext(i);
  413. }
  414. s.OnCompleted();
  415. end.WaitOne();
  416. }
  417. #endif
  418. [Fact]
  419. public void ObserveOn_LongRunning_HoldUpDuringDispatchAndFail()
  420. {
  421. var started = default(ManualResetEvent);
  422. var stopped = default(ManualResetEvent);
  423. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  424. var s = new Subject<int>();
  425. var onNext = new ManualResetEvent(false);
  426. var resume = new ManualResetEvent(false);
  427. var lst = new List<int>();
  428. var err = default(Exception);
  429. var end = new ManualResetEvent(false);
  430. s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); onNext.Set(); resume.WaitOne(); }, ex_ => { err = ex_; end.Set(); });
  431. s.OnNext(1);
  432. onNext.WaitOne();
  433. s.OnNext(2);
  434. s.OnNext(3);
  435. var ex = new Exception();
  436. s.OnError(ex);
  437. resume.Set();
  438. end.WaitOne();
  439. Assert.True(lst.SequenceEqual(new[] { 1, 2, 3 }));
  440. Assert.Same(ex, err);
  441. }
  442. [Fact]
  443. public void ObserveOn_LongRunning_Cancel()
  444. {
  445. var started = default(ManualResetEvent);
  446. var stopped = default(ManualResetEvent);
  447. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  448. var s = new Subject<int>();
  449. var lst = new List<int>();
  450. var end = new ManualResetEvent(false);
  451. var running = new ManualResetEvent(false);
  452. var d = s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); running.Set(); });
  453. s.OnNext(0);
  454. started.WaitOne();
  455. s.OnNext(1);
  456. s.OnNext(2);
  457. s.OnNext(3);
  458. running.WaitOne();
  459. d.Dispose();
  460. stopped.WaitOne();
  461. s.OnNext(4);
  462. Assert.True(lst.Count > 0 && !lst.Contains(4));
  463. }
  464. [Fact]
  465. public void ObserveOn_LongRunning_OnNextThrows()
  466. {
  467. var started = default(ManualResetEvent);
  468. var stopped = default(ManualResetEvent);
  469. var exception = default(Exception);
  470. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e, ex => exception = ex);
  471. var s = new Subject<int>();
  472. var lst = new List<int>();
  473. var end = new ManualResetEvent(false);
  474. var running = new ManualResetEvent(false);
  475. var d = s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); running.Set(); if (x == 3) { throw new Exception(); } });
  476. s.OnNext(0);
  477. started.WaitOne();
  478. s.OnNext(1);
  479. s.OnNext(2);
  480. s.OnNext(3);
  481. running.WaitOne();
  482. s.OnNext(4);
  483. stopped.WaitOne();
  484. Assert.NotNull(exception);
  485. }
  486. #endif
  487. [Fact]
  488. public void ObserveOn_SynchronizationContext_Simple()
  489. {
  490. var scheduler = new TestScheduler();
  491. var xs = scheduler.CreateHotObservable(
  492. OnNext(90, 1),
  493. OnNext(120, 2),
  494. OnNext(230, 3),
  495. OnNext(240, 4),
  496. OnNext(310, 5),
  497. OnNext(470, 6),
  498. OnCompleted<int>(530)
  499. );
  500. var results = scheduler.Start(() =>
  501. xs.ObserveOn(new MyCtx(scheduler))
  502. );
  503. results.Messages.AssertEqual(
  504. OnNext(231, 3),
  505. OnNext(241, 4),
  506. OnNext(311, 5),
  507. OnNext(471, 6),
  508. OnCompleted<int>(531)
  509. );
  510. xs.Subscriptions.AssertEqual(
  511. Subscribe(200, 531)
  512. );
  513. }
  514. [Fact]
  515. public void ObserveOn_EventLoop_Long()
  516. {
  517. using var _scheduler1 = new EventLoopScheduler();
  518. var N = 1_000_000;
  519. var cde = new CountdownEvent(1);
  520. Observable.Range(1, N).ObserveOn(_scheduler1)
  521. .Subscribe(v => { }, () => cde.Signal());
  522. Assert.True(cde.Wait(MaxWaitTime), "Timeout!");
  523. }
  524. [Fact]
  525. public void ObserveOn_LongRunning_SameThread()
  526. {
  527. var scheduler = TaskPoolScheduler.Default;
  528. Assert.NotNull(scheduler.AsLongRunning());
  529. var N = 1_000_000;
  530. var threads = new HashSet<long>();
  531. var cde = new CountdownEvent(1);
  532. Observable.Range(1, N)
  533. .ObserveOn(scheduler)
  534. .Subscribe(
  535. v => threads.Add(Thread.CurrentThread.ManagedThreadId),
  536. e => cde.Signal(),
  537. () => cde.Signal()
  538. );
  539. Assert.True(cde.Wait(MaxWaitTime), "Timeout!");
  540. Assert.Equal(1, threads.Count);
  541. }
  542. [Fact]
  543. public void ObserveOn_LongRunning_DisableOptimizations()
  544. {
  545. var scheduler = TaskPoolScheduler.Default.DisableOptimizations();
  546. Assert.Null(scheduler.AsLongRunning());
  547. var N = 1_000_000;
  548. var threads = new HashSet<long>();
  549. var cde = new CountdownEvent(1);
  550. Observable.Range(1, N)
  551. .ObserveOn(scheduler)
  552. .Subscribe(
  553. v => threads.Add(Thread.CurrentThread.ManagedThreadId),
  554. e => cde.Signal(),
  555. () => cde.Signal()
  556. );
  557. Assert.True(cde.Wait(MaxWaitTime), "Timeout!");
  558. Assert.True(threads.Count >= 1);
  559. }
  560. }
  561. internal class MyCtx : SynchronizationContext
  562. {
  563. private IScheduler _scheduler;
  564. public MyCtx(IScheduler scheduler)
  565. {
  566. _scheduler = scheduler;
  567. }
  568. public override void Post(SendOrPostCallback d, object state)
  569. {
  570. _scheduler.Schedule(state, (self, s) => { d(s); return Disposable.Empty; });
  571. }
  572. }
  573. }