ObserveOnTest.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class ObserveOnTest : TestBase
  22. {
  23. #region + TestBase +
  24. [Fact]
  25. public void ObserveOn_ArgumentChecking()
  26. {
  27. var someObservable = Observable.Empty<int>();
  28. #if HAS_WINFORMS
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new ControlScheduler(new Label())));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(ControlScheduler)));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.ObserveOn<int>(default(IObservable<int>), new Label()));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.ObserveOn<int>(someObservable, default(Label)));
  33. #endif
  34. #if HAS_DISPATCHER
  35. #if USE_SL_DISPATCHER
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new DispatcherScheduler(System.Windows.Deployment.Current.Dispatcher)));
  37. #else
  38. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new DispatcherScheduler(Dispatcher.CurrentDispatcher)));
  39. #endif
  40. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(DispatcherScheduler)));
  41. #if USE_SL_DISPATCHER
  42. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(default(IObservable<int>), System.Windows.Deployment.Current.Dispatcher));
  43. #else
  44. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(default(IObservable<int>), Dispatcher.CurrentDispatcher));
  45. #endif
  46. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(someObservable, default(Dispatcher)));
  47. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOnDispatcher<int>(default(IObservable<int>)));
  48. #endif
  49. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new SynchronizationContext()));
  50. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(SynchronizationContext)));
  51. }
  52. #if HAS_WINFORMS
  53. [Fact]
  54. public void ObserveOn_Control()
  55. {
  56. var lbl = CreateLabel();
  57. var evt = new ManualResetEvent(false);
  58. bool okay = true;
  59. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(lbl).Subscribe(x =>
  60. {
  61. lbl.Text = x.ToString();
  62. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  63. }, () => evt.Set());
  64. evt.WaitOne();
  65. Application.Exit();
  66. Assert.True(okay);
  67. }
  68. [Fact]
  69. public void ObserveOn_ControlScheduler()
  70. {
  71. var lbl = CreateLabel();
  72. var evt = new ManualResetEvent(false);
  73. bool okay = true;
  74. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(new ControlScheduler(lbl)).Subscribe(x =>
  75. {
  76. lbl.Text = x.ToString();
  77. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  78. }, () => evt.Set());
  79. evt.WaitOne();
  80. Application.Exit();
  81. Assert.True(okay);
  82. }
  83. private Label CreateLabel()
  84. {
  85. var loaded = new ManualResetEvent(false);
  86. var lbl = default(Label);
  87. var t = new Thread(() =>
  88. {
  89. lbl = new Label();
  90. var frm = new Form { Controls = { lbl }, Width = 0, Height = 0, FormBorderStyle = FormBorderStyle.None, ShowInTaskbar = false };
  91. frm.Load += (_, __) =>
  92. {
  93. loaded.Set();
  94. };
  95. Application.Run(frm);
  96. });
  97. t.SetApartmentState(ApartmentState.STA);
  98. t.Start();
  99. loaded.WaitOne();
  100. return lbl;
  101. }
  102. #endif
  103. #if HAS_DISPATCHER
  104. [Fact]
  105. [Asynchronous]
  106. public void ObserveOn_Dispatcher()
  107. {
  108. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  109. RunAsync(evt =>
  110. {
  111. bool okay = true;
  112. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(dispatcher).Subscribe(x =>
  113. {
  114. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  115. }, () =>
  116. {
  117. Assert.True(okay);
  118. dispatcher.InvokeShutdown();
  119. evt.Set();
  120. });
  121. });
  122. }
  123. [Fact]
  124. [Asynchronous]
  125. public void ObserveOn_DispatcherScheduler()
  126. {
  127. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  128. RunAsync(evt =>
  129. {
  130. bool okay = true;
  131. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(new DispatcherScheduler(dispatcher)).Subscribe(x =>
  132. {
  133. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  134. }, () =>
  135. {
  136. Assert.True(okay);
  137. dispatcher.InvokeShutdown();
  138. evt.Set();
  139. });
  140. });
  141. }
  142. [Fact]
  143. [Asynchronous]
  144. public void ObserveOn_CurrentDispatcher()
  145. {
  146. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  147. RunAsync(evt =>
  148. {
  149. bool okay = true;
  150. dispatcher.BeginInvoke(new Action(() =>
  151. {
  152. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOnDispatcher().Subscribe(x =>
  153. {
  154. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  155. }, () =>
  156. {
  157. Assert.True(okay);
  158. dispatcher.InvokeShutdown();
  159. evt.Set();
  160. });
  161. }));
  162. });
  163. }
  164. [Fact]
  165. [Asynchronous]
  166. public void ObserveOn_Error()
  167. {
  168. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  169. RunAsync(evt =>
  170. {
  171. var ex = new Exception();
  172. bool okay = true;
  173. dispatcher.BeginInvoke(new Action(() =>
  174. {
  175. Observable.Throw<int>(ex).ObserveOnDispatcher().Subscribe(x =>
  176. {
  177. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  178. },
  179. e =>
  180. {
  181. Assert.True(okay);
  182. Assert.Same(ex, e);
  183. dispatcher.InvokeShutdown();
  184. evt.Set();
  185. },
  186. () =>
  187. {
  188. Assert.True(false);
  189. dispatcher.InvokeShutdown();
  190. evt.Set();
  191. });
  192. }));
  193. });
  194. }
  195. #endif
  196. #endregion + TestBase +
  197. }
  198. public class ObserveOnReactiveTest : ReactiveTest
  199. {
  200. [Fact]
  201. public void ObserveOn_Scheduler_ArgumentChecking()
  202. {
  203. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn(default(IObservable<int>), DummyScheduler.Instance));
  204. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn(DummyObservable<int>.Instance, default(IScheduler)));
  205. }
  206. [Fact]
  207. public void ObserveOn_Scheduler_Completed()
  208. {
  209. var scheduler = new TestScheduler();
  210. var xs = scheduler.CreateHotObservable(
  211. OnNext(90, 1),
  212. OnNext(120, 2),
  213. OnNext(230, 3),
  214. OnNext(240, 4),
  215. OnNext(310, 5),
  216. OnNext(470, 6),
  217. OnCompleted<int>(530)
  218. );
  219. var results = scheduler.Start(() =>
  220. xs.ObserveOn(scheduler)
  221. );
  222. results.Messages.AssertEqual(
  223. OnNext(231, 3),
  224. OnNext(241, 4),
  225. OnNext(311, 5),
  226. OnNext(471, 6),
  227. OnCompleted<int>(531)
  228. );
  229. #if !NO_PERF
  230. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  231. xs.Subscriptions.AssertEqual(
  232. Subscribe(200, 531)
  233. );
  234. #else
  235. //
  236. // TODO: Check platform discrepancies
  237. //
  238. //xs.Subscriptions.AssertEqual(
  239. // Subscribe(200, 1000)
  240. //);
  241. #endif
  242. }
  243. [Fact]
  244. public void ObserveOn_Scheduler_Error()
  245. {
  246. var scheduler = new TestScheduler();
  247. var ex = new Exception();
  248. var xs = scheduler.CreateHotObservable(
  249. OnNext(90, 1),
  250. OnNext(120, 2),
  251. OnNext(230, 3),
  252. OnNext(240, 4),
  253. OnNext(310, 5),
  254. OnNext(470, 6),
  255. OnError<int>(530, ex)
  256. );
  257. var results = scheduler.Start(() =>
  258. xs.ObserveOn(scheduler)
  259. );
  260. results.Messages.AssertEqual(
  261. OnNext(231, 3),
  262. OnNext(241, 4),
  263. OnNext(311, 5),
  264. OnNext(471, 6),
  265. OnError<int>(531, ex)
  266. );
  267. #if !NO_PERF
  268. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  269. xs.Subscriptions.AssertEqual(
  270. Subscribe(200, 531)
  271. );
  272. #else
  273. //
  274. // TODO: Check platform discrepancies
  275. //
  276. //xs.Subscriptions.AssertEqual(
  277. // Subscribe(200, 1000)
  278. //);
  279. #endif
  280. }
  281. [Fact]
  282. public void ObserveOn_Scheduler_Dispose()
  283. {
  284. var scheduler = new TestScheduler();
  285. var xs = scheduler.CreateHotObservable(
  286. OnNext(90, 1),
  287. OnNext(120, 2),
  288. OnNext(230, 3),
  289. OnNext(240, 4),
  290. OnNext(310, 5),
  291. OnNext(470, 6)
  292. );
  293. var results = scheduler.Start(() =>
  294. xs.ObserveOn(scheduler)
  295. );
  296. results.Messages.AssertEqual(
  297. OnNext(231, 3),
  298. OnNext(241, 4),
  299. OnNext(311, 5),
  300. OnNext(471, 6)
  301. );
  302. xs.Subscriptions.AssertEqual(
  303. Subscribe(200, 1000)
  304. );
  305. }
  306. [Fact]
  307. public void ObserveOn_Scheduler_SameTime()
  308. {
  309. var scheduler = new TestScheduler();
  310. var xs = scheduler.CreateHotObservable(
  311. OnNext(210, 1),
  312. OnNext(210, 2)
  313. );
  314. var results = scheduler.Start(() =>
  315. xs.ObserveOn(scheduler)
  316. );
  317. results.Messages.AssertEqual(
  318. OnNext(211, 1),
  319. OnNext(212, 2)
  320. );
  321. xs.Subscriptions.AssertEqual(
  322. Subscribe(200, 1000)
  323. );
  324. }
  325. [Fact]
  326. public void ObserveOn_Scheduler_OnNextThrows()
  327. {
  328. var e = new ManualResetEvent(false);
  329. var scheduler = new MyScheduler(e);
  330. Observable.Range(0, 10, Scheduler.Default).ObserveOn(scheduler).Subscribe(
  331. x =>
  332. {
  333. if (x == 5)
  334. throw new Exception();
  335. }
  336. );
  337. e.WaitOne();
  338. Assert.NotNull(scheduler._exception);
  339. }
  340. class MyScheduler : IScheduler
  341. {
  342. internal Exception _exception;
  343. private ManualResetEvent _evt;
  344. public MyScheduler(ManualResetEvent e)
  345. {
  346. _evt = e;
  347. }
  348. public DateTimeOffset Now
  349. {
  350. get { throw new NotImplementedException(); }
  351. }
  352. public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  353. {
  354. try
  355. {
  356. return action(this, state);
  357. }
  358. catch (Exception ex)
  359. {
  360. _exception = ex;
  361. _evt.Set();
  362. return Disposable.Empty;
  363. }
  364. }
  365. public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  366. {
  367. throw new NotImplementedException();
  368. }
  369. public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  370. {
  371. throw new NotImplementedException();
  372. }
  373. }
  374. #if !NO_PERF
  375. [Fact]
  376. public void ObserveOn_LongRunning_Simple()
  377. {
  378. var started = default(ManualResetEvent);
  379. var stopped = default(ManualResetEvent);
  380. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  381. var s = new Subject<int>();
  382. var end = new ManualResetEvent(false);
  383. var lst = new List<int>();
  384. s.ObserveOn(scheduler).Subscribe(lst.Add, () => end.Set());
  385. s.OnNext(1);
  386. s.OnNext(2);
  387. s.OnNext(3);
  388. s.OnCompleted();
  389. end.WaitOne();
  390. Assert.True(lst.SequenceEqual(new[] { 1, 2, 3 }));
  391. }
  392. [Fact]
  393. public void ObserveOn_LongRunning_Error()
  394. {
  395. var started = default(ManualResetEvent);
  396. var stopped = default(ManualResetEvent);
  397. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  398. var s = new Subject<int>();
  399. var end = new ManualResetEvent(false);
  400. var err = default(Exception);
  401. s.ObserveOn(scheduler).Subscribe(_ => { }, ex => { err = ex; end.Set(); });
  402. s.OnNext(1);
  403. s.OnNext(2);
  404. s.OnNext(3);
  405. var ex_ = new Exception();
  406. s.OnError(ex_);
  407. end.WaitOne();
  408. Assert.Same(ex_, err);
  409. }
  410. #if !NO_THREAD
  411. [Fact]
  412. public void ObserveOn_LongRunning_TimeVariance()
  413. {
  414. var started = default(ManualResetEvent);
  415. var stopped = default(ManualResetEvent);
  416. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  417. var s = new Subject<int>();
  418. var end = new ManualResetEvent(false);
  419. s.ObserveOn(scheduler).Subscribe(_ => { }, () => end.Set());
  420. s.OnNext(1); // Ensure active
  421. started.WaitOne();
  422. Thread.Sleep(100); // Try to enter the dispatcher event wait state
  423. for (int i = 0; i < 1000; i++)
  424. {
  425. if (i % 100 == 0)
  426. Thread.Sleep(10);
  427. s.OnNext(i);
  428. }
  429. s.OnCompleted();
  430. end.WaitOne();
  431. }
  432. #endif
  433. [Fact]
  434. public void ObserveOn_LongRunning_HoldUpDuringDispatchAndFail()
  435. {
  436. var started = default(ManualResetEvent);
  437. var stopped = default(ManualResetEvent);
  438. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  439. var s = new Subject<int>();
  440. var onNext = new ManualResetEvent(false);
  441. var resume = new ManualResetEvent(false);
  442. var lst = new List<int>();
  443. var err = default(Exception);
  444. var end = new ManualResetEvent(false);
  445. s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); onNext.Set(); resume.WaitOne(); }, ex_ => { err = ex_; end.Set(); });
  446. s.OnNext(1);
  447. onNext.WaitOne();
  448. s.OnNext(2);
  449. s.OnNext(3);
  450. var ex = new Exception();
  451. s.OnError(ex);
  452. resume.Set();
  453. end.WaitOne();
  454. Assert.True(lst.SequenceEqual(new[] { 1, 2, 3 }));
  455. Assert.Same(ex, err);
  456. }
  457. [Fact]
  458. public void ObserveOn_LongRunning_Cancel()
  459. {
  460. var started = default(ManualResetEvent);
  461. var stopped = default(ManualResetEvent);
  462. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  463. var s = new Subject<int>();
  464. var lst = new List<int>();
  465. var end = new ManualResetEvent(false);
  466. var running = new ManualResetEvent(false);
  467. var d = s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); running.Set(); });
  468. s.OnNext(0);
  469. started.WaitOne();
  470. s.OnNext(1);
  471. s.OnNext(2);
  472. s.OnNext(3);
  473. running.WaitOne();
  474. d.Dispose();
  475. stopped.WaitOne();
  476. s.OnNext(4);
  477. Assert.True(lst.Count > 0 && !lst.Contains(4));
  478. }
  479. [Fact]
  480. public void ObserveOn_LongRunning_OnNextThrows()
  481. {
  482. var started = default(ManualResetEvent);
  483. var stopped = default(ManualResetEvent);
  484. var exception = default(Exception);
  485. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e, ex => exception = ex);
  486. var s = new Subject<int>();
  487. var lst = new List<int>();
  488. var end = new ManualResetEvent(false);
  489. var running = new ManualResetEvent(false);
  490. var d = s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); running.Set(); if (x == 3) throw new Exception(); });
  491. s.OnNext(0);
  492. started.WaitOne();
  493. s.OnNext(1);
  494. s.OnNext(2);
  495. s.OnNext(3);
  496. running.WaitOne();
  497. s.OnNext(4);
  498. stopped.WaitOne();
  499. Assert.NotNull(exception);
  500. }
  501. #endif
  502. [Fact]
  503. public void ObserveOn_SynchronizationContext_Simple()
  504. {
  505. var scheduler = new TestScheduler();
  506. var xs = scheduler.CreateHotObservable(
  507. OnNext(90, 1),
  508. OnNext(120, 2),
  509. OnNext(230, 3),
  510. OnNext(240, 4),
  511. OnNext(310, 5),
  512. OnNext(470, 6),
  513. OnCompleted<int>(530)
  514. );
  515. var results = scheduler.Start(() =>
  516. xs.ObserveOn(new MyCtx(scheduler))
  517. );
  518. results.Messages.AssertEqual(
  519. OnNext(231, 3),
  520. OnNext(241, 4),
  521. OnNext(311, 5),
  522. OnNext(471, 6),
  523. OnCompleted<int>(531)
  524. );
  525. xs.Subscriptions.AssertEqual(
  526. Subscribe(200, 531)
  527. );
  528. }
  529. }
  530. internal class MyCtx : SynchronizationContext
  531. {
  532. private IScheduler scheduler;
  533. public MyCtx(IScheduler scheduler)
  534. {
  535. this.scheduler = scheduler;
  536. }
  537. public override void Post(SendOrPostCallback d, object state)
  538. {
  539. scheduler.Schedule(state, (self, s) => { d(s); return Disposable.Empty; });
  540. }
  541. }
  542. }