1
0

ObservableConcurrencyTest.cs 34 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Disposables;
  10. using System.Reactive.Linq;
  11. using System.Reactive.Subjects;
  12. using System.Threading;
  13. #if NET45 || DNX451
  14. using System.Windows.Threading;
  15. #endif
  16. using Microsoft.Reactive.Testing;
  17. using Xunit;
  18. using ReactiveTests.Dummies;
  19. #if HAS_WINFORMS
  20. using System.Windows.Forms;
  21. #endif
  22. #if SILVERLIGHT && !SILVERLIGHTM7
  23. using Microsoft.Silverlight.Testing;
  24. #endif
  25. namespace ReactiveTests.Tests
  26. {
  27. public partial class ObservableConcurrencyTest : TestBase
  28. {
  29. #region + ObserveOn +
  30. [Fact]
  31. public void ObserveOn_ArgumentChecking()
  32. {
  33. var someObservable = Observable.Empty<int>();
  34. #if HAS_WINFORMS
  35. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new ControlScheduler(new Label())));
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(ControlScheduler)));
  37. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.ObserveOn<int>(default(IObservable<int>), new Label()));
  38. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.ObserveOn<int>(someObservable, default(Label)));
  39. #endif
  40. #if HAS_DISPATCHER
  41. #if USE_SL_DISPATCHER
  42. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new DispatcherScheduler(System.Windows.Deployment.Current.Dispatcher)));
  43. #else
  44. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new DispatcherScheduler(Dispatcher.CurrentDispatcher)));
  45. #endif
  46. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(DispatcherScheduler)));
  47. #if USE_SL_DISPATCHER
  48. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(default(IObservable<int>), System.Windows.Deployment.Current.Dispatcher));
  49. #else
  50. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(default(IObservable<int>), Dispatcher.CurrentDispatcher));
  51. #endif
  52. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(someObservable, default(Dispatcher)));
  53. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOnDispatcher<int>(default(IObservable<int>)));
  54. #endif
  55. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new SynchronizationContext()));
  56. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(SynchronizationContext)));
  57. }
  58. #if HAS_WINFORMS
  59. [Fact]
  60. public void ObserveOn_Control()
  61. {
  62. var lbl = CreateLabel();
  63. var evt = new ManualResetEvent(false);
  64. bool okay = true;
  65. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(lbl).Subscribe(x =>
  66. {
  67. lbl.Text = x.ToString();
  68. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  69. }, () => evt.Set());
  70. evt.WaitOne();
  71. Application.Exit();
  72. Assert.True(okay);
  73. }
  74. [Fact]
  75. public void ObserveOn_ControlScheduler()
  76. {
  77. var lbl = CreateLabel();
  78. var evt = new ManualResetEvent(false);
  79. bool okay = true;
  80. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(new ControlScheduler(lbl)).Subscribe(x =>
  81. {
  82. lbl.Text = x.ToString();
  83. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  84. }, () => evt.Set());
  85. evt.WaitOne();
  86. Application.Exit();
  87. Assert.True(okay);
  88. }
  89. private Label CreateLabel()
  90. {
  91. var loaded = new ManualResetEvent(false);
  92. var lbl = default(Label);
  93. var t = new Thread(() =>
  94. {
  95. lbl = new Label();
  96. var frm = new Form { Controls = { lbl }, Width = 0, Height = 0, FormBorderStyle = FormBorderStyle.None, ShowInTaskbar = false };
  97. frm.Load += (_, __) =>
  98. {
  99. loaded.Set();
  100. };
  101. Application.Run(frm);
  102. });
  103. t.SetApartmentState(ApartmentState.STA);
  104. t.Start();
  105. loaded.WaitOne();
  106. return lbl;
  107. }
  108. #endif
  109. #if HAS_DISPATCHER
  110. [Fact]
  111. [Asynchronous]
  112. public void ObserveOn_Dispatcher()
  113. {
  114. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  115. RunAsync(evt =>
  116. {
  117. bool okay = true;
  118. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(dispatcher).Subscribe(x =>
  119. {
  120. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  121. }, () =>
  122. {
  123. Assert.True(okay);
  124. dispatcher.InvokeShutdown();
  125. evt.Set();
  126. });
  127. });
  128. }
  129. [Fact]
  130. [Asynchronous]
  131. public void ObserveOn_DispatcherScheduler()
  132. {
  133. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  134. RunAsync(evt =>
  135. {
  136. bool okay = true;
  137. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(new DispatcherScheduler(dispatcher)).Subscribe(x =>
  138. {
  139. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  140. }, () =>
  141. {
  142. Assert.True(okay);
  143. dispatcher.InvokeShutdown();
  144. evt.Set();
  145. });
  146. });
  147. }
  148. [Fact]
  149. [Asynchronous]
  150. public void ObserveOn_CurrentDispatcher()
  151. {
  152. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  153. RunAsync(evt =>
  154. {
  155. bool okay = true;
  156. dispatcher.BeginInvoke(new Action(() =>
  157. {
  158. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOnDispatcher().Subscribe(x =>
  159. {
  160. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  161. }, () =>
  162. {
  163. Assert.True(okay);
  164. dispatcher.InvokeShutdown();
  165. evt.Set();
  166. });
  167. }));
  168. });
  169. }
  170. [Fact]
  171. [Asynchronous]
  172. public void ObserveOn_Error()
  173. {
  174. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  175. RunAsync(evt =>
  176. {
  177. var ex = new Exception();
  178. bool okay = true;
  179. dispatcher.BeginInvoke(new Action(() =>
  180. {
  181. Observable.Throw<int>(ex).ObserveOnDispatcher().Subscribe(x =>
  182. {
  183. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  184. },
  185. e =>
  186. {
  187. Assert.True(okay);
  188. Assert.Same(ex, e);
  189. dispatcher.InvokeShutdown();
  190. evt.Set();
  191. },
  192. () =>
  193. {
  194. Assert.True(false);
  195. dispatcher.InvokeShutdown();
  196. evt.Set();
  197. });
  198. }));
  199. });
  200. }
  201. #endif
  202. #endregion
  203. #region SubscribeOn
  204. [Fact]
  205. public void SubscribeOn_ArgumentChecking()
  206. {
  207. var someObservable = Observable.Empty<int>();
  208. #if HAS_WINFORMS
  209. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new ControlScheduler(new Label())));
  210. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(someObservable, default(ControlScheduler)));
  211. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.SubscribeOn<int>(default(IObservable<int>), new Label()));
  212. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.SubscribeOn<int>(someObservable, default(Label)));
  213. #endif
  214. #if HAS_DISPATCHER
  215. #if USE_SL_DISPATCHER
  216. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new DispatcherScheduler(System.Windows.Deployment.Current.Dispatcher)));
  217. #else
  218. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new DispatcherScheduler(Dispatcher.CurrentDispatcher)));
  219. #endif
  220. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(someObservable, default(DispatcherScheduler)));
  221. #if USE_SL_DISPATCHER
  222. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOn<int>(default(IObservable<int>), System.Windows.Deployment.Current.Dispatcher));
  223. #else
  224. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOn<int>(default(IObservable<int>), Dispatcher.CurrentDispatcher));
  225. #endif
  226. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOn<int>(someObservable, default(Dispatcher)));
  227. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOnDispatcher<int>(default(IObservable<int>)));
  228. #endif
  229. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new SynchronizationContext()));
  230. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(someObservable, default(SynchronizationContext)));
  231. }
  232. #if HAS_WINFORMS
  233. [Fact]
  234. public void SubscribeOn_Control()
  235. {
  236. var lbl = CreateLabel();
  237. var evt2 = new ManualResetEvent(false);
  238. var evt = new ManualResetEvent(false);
  239. bool okay = true;
  240. var d = Observable.Create<int>(obs =>
  241. {
  242. lbl.Text = "Subscribe";
  243. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  244. evt2.Set();
  245. return () =>
  246. {
  247. lbl.Text = "Unsubscribe";
  248. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  249. evt.Set();
  250. };
  251. })
  252. .SubscribeOn(lbl)
  253. .Subscribe(_ => {});
  254. evt2.WaitOne();
  255. d.Dispose();
  256. evt.WaitOne();
  257. Application.Exit();
  258. Assert.True(okay);
  259. }
  260. [Fact]
  261. public void SubscribeOn_ControlScheduler()
  262. {
  263. var lbl = CreateLabel();
  264. var evt2 = new ManualResetEvent(false);
  265. var evt = new ManualResetEvent(false);
  266. bool okay = true;
  267. var d = Observable.Create<int>(obs =>
  268. {
  269. lbl.Text = "Subscribe";
  270. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  271. evt2.Set();
  272. return () =>
  273. {
  274. lbl.Text = "Unsubscribe";
  275. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  276. evt.Set();
  277. };
  278. })
  279. .SubscribeOn(new ControlScheduler(lbl))
  280. .Subscribe(_ => { });
  281. evt2.WaitOne();
  282. d.Dispose();
  283. evt.WaitOne();
  284. Application.Exit();
  285. Assert.True(okay);
  286. }
  287. #endif
  288. #if HAS_DISPATCHER
  289. [Fact]
  290. [Asynchronous]
  291. public void SubscribeOn_Dispatcher()
  292. {
  293. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  294. RunAsync(evt =>
  295. {
  296. var s = new AsyncSubject<Unit>();
  297. bool okay = true;
  298. var d = Observable.Create<int>(obs =>
  299. {
  300. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  301. s.OnNext(Unit.Default);
  302. s.OnCompleted();
  303. return () =>
  304. {
  305. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  306. Assert.True(okay);
  307. dispatcher.InvokeShutdown();
  308. evt.Set();
  309. };
  310. })
  311. .SubscribeOn(dispatcher)
  312. .Subscribe(_ => { });
  313. s.Subscribe(_ => d.Dispose());
  314. });
  315. }
  316. [Fact]
  317. [Asynchronous]
  318. public void SubscribeOn_DispatcherScheduler()
  319. {
  320. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  321. RunAsync(evt =>
  322. {
  323. var s = new AsyncSubject<Unit>();
  324. bool okay = true;
  325. var d = Observable.Create<int>(obs =>
  326. {
  327. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  328. s.OnNext(Unit.Default);
  329. s.OnCompleted();
  330. return () =>
  331. {
  332. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  333. Assert.True(okay);
  334. dispatcher.InvokeShutdown();
  335. evt.Set();
  336. };
  337. })
  338. .SubscribeOn(new DispatcherScheduler(dispatcher))
  339. .Subscribe(_ => { });
  340. s.Subscribe(_ => d.Dispose());
  341. });
  342. }
  343. [Fact]
  344. [Asynchronous]
  345. public void SubscribeOn_CurrentDispatcher()
  346. {
  347. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  348. RunAsync(evt =>
  349. {
  350. var s = new AsyncSubject<Unit>();
  351. bool okay = true;
  352. dispatcher.BeginInvoke(new Action(() =>
  353. {
  354. var d = Observable.Create<int>(obs =>
  355. {
  356. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  357. s.OnNext(Unit.Default);
  358. s.OnCompleted();
  359. return () =>
  360. {
  361. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  362. Assert.True(okay);
  363. dispatcher.InvokeShutdown();
  364. evt.Set();
  365. };
  366. })
  367. .SubscribeOnDispatcher()
  368. .Subscribe(_ => { });
  369. s.Subscribe(_ => d.Dispose());
  370. }));
  371. });
  372. }
  373. #endif
  374. #endregion
  375. #region + Synchronize +
  376. [Fact]
  377. public void Synchronize_ArgumentChecking()
  378. {
  379. var someObservable = Observable.Empty<int>();
  380. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default(IObservable<int>)));
  381. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default(IObservable<int>), new object()));
  382. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(someObservable, null));
  383. }
  384. #if !NO_THREAD
  385. [Fact]
  386. public void Synchronize_Range()
  387. {
  388. int i = 0;
  389. bool outsideLock = true;
  390. var gate = new object();
  391. lock (gate)
  392. {
  393. outsideLock = false;
  394. Observable.Range(0, 100, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => i++, () => { Assert.True(outsideLock); });
  395. Thread.Sleep(100);
  396. Assert.Equal(0, i);
  397. outsideLock = true;
  398. }
  399. while (i < 100)
  400. {
  401. Thread.Sleep(10);
  402. lock (gate)
  403. {
  404. int start = i;
  405. Thread.Sleep(100);
  406. Assert.Equal(start, i);
  407. }
  408. }
  409. }
  410. [Fact]
  411. public void Synchronize_Throw()
  412. {
  413. var ex = new Exception();
  414. var resLock = new object();
  415. var e = default(Exception);
  416. bool outsideLock = true;
  417. var gate = new object();
  418. lock (gate)
  419. {
  420. outsideLock = false;
  421. Observable.Throw<int>(ex, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => { Assert.True(false); }, err => { lock (resLock) { e = err; } }, () => { Assert.True(outsideLock); });
  422. Thread.Sleep(100);
  423. Assert.Null(e);
  424. outsideLock = true;
  425. }
  426. while (true)
  427. {
  428. lock (resLock)
  429. {
  430. if (e != null)
  431. break;
  432. }
  433. }
  434. Assert.Same(ex, e);
  435. }
  436. [Fact]
  437. public void Synchronize_BadObservable()
  438. {
  439. var o = Observable.Create<int>(obs =>
  440. {
  441. var t1 = new Thread(() =>
  442. {
  443. for (int i = 0; i < 100; i++)
  444. {
  445. obs.OnNext(i);
  446. }
  447. });
  448. new Thread(() =>
  449. {
  450. t1.Start();
  451. for (int i = 100; i < 200; i++)
  452. {
  453. obs.OnNext(i);
  454. }
  455. t1.Join();
  456. obs.OnCompleted();
  457. }).Start();
  458. return () => { };
  459. });
  460. var evt = new ManualResetEvent(false);
  461. int sum = 0;
  462. o.Synchronize().Subscribe(x => sum += x, () => { evt.Set(); });
  463. evt.WaitOne();
  464. Assert.Equal(Enumerable.Range(0, 200).Sum(), sum);
  465. }
  466. #endif
  467. #endregion
  468. }
  469. public class ObservableConcurrencyReactiveTest : ReactiveTest
  470. {
  471. #region + ObserveOn +
  472. [Fact]
  473. public void ObserveOn_Scheduler_ArgumentChecking()
  474. {
  475. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn(default(IObservable<int>), DummyScheduler.Instance));
  476. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn(DummyObservable<int>.Instance, default(IScheduler)));
  477. }
  478. [Fact]
  479. public void ObserveOn_Scheduler_Completed()
  480. {
  481. var scheduler = new TestScheduler();
  482. var xs = scheduler.CreateHotObservable(
  483. OnNext( 90, 1),
  484. OnNext(120, 2),
  485. OnNext(230, 3),
  486. OnNext(240, 4),
  487. OnNext(310, 5),
  488. OnNext(470, 6),
  489. OnCompleted<int>(530)
  490. );
  491. var results = scheduler.Start(() =>
  492. xs.ObserveOn(scheduler)
  493. );
  494. results.Messages.AssertEqual(
  495. OnNext(231, 3),
  496. OnNext(241, 4),
  497. OnNext(311, 5),
  498. OnNext(471, 6),
  499. OnCompleted<int>(531)
  500. );
  501. #if !NO_PERF && !NO_CDS
  502. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  503. xs.Subscriptions.AssertEqual(
  504. Subscribe(200, 531)
  505. );
  506. #else
  507. //
  508. // TODO: Check platform discrepancies
  509. //
  510. //xs.Subscriptions.AssertEqual(
  511. // Subscribe(200, 1000)
  512. //);
  513. #endif
  514. }
  515. [Fact]
  516. public void ObserveOn_Scheduler_Error()
  517. {
  518. var scheduler = new TestScheduler();
  519. var ex = new Exception();
  520. var xs = scheduler.CreateHotObservable(
  521. OnNext(90, 1),
  522. OnNext(120, 2),
  523. OnNext(230, 3),
  524. OnNext(240, 4),
  525. OnNext(310, 5),
  526. OnNext(470, 6),
  527. OnError<int>(530, ex)
  528. );
  529. var results = scheduler.Start(() =>
  530. xs.ObserveOn(scheduler)
  531. );
  532. results.Messages.AssertEqual(
  533. OnNext(231, 3),
  534. OnNext(241, 4),
  535. OnNext(311, 5),
  536. OnNext(471, 6),
  537. OnError<int>(531, ex)
  538. );
  539. #if !NO_PERF && !NO_CDS
  540. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  541. xs.Subscriptions.AssertEqual(
  542. Subscribe(200, 531)
  543. );
  544. #else
  545. //
  546. // TODO: Check platform discrepancies
  547. //
  548. //xs.Subscriptions.AssertEqual(
  549. // Subscribe(200, 1000)
  550. //);
  551. #endif
  552. }
  553. [Fact]
  554. public void ObserveOn_Scheduler_Dispose()
  555. {
  556. var scheduler = new TestScheduler();
  557. var xs = scheduler.CreateHotObservable(
  558. OnNext(90, 1),
  559. OnNext(120, 2),
  560. OnNext(230, 3),
  561. OnNext(240, 4),
  562. OnNext(310, 5),
  563. OnNext(470, 6)
  564. );
  565. var results = scheduler.Start(() =>
  566. xs.ObserveOn(scheduler)
  567. );
  568. results.Messages.AssertEqual(
  569. OnNext(231, 3),
  570. OnNext(241, 4),
  571. OnNext(311, 5),
  572. OnNext(471, 6)
  573. );
  574. xs.Subscriptions.AssertEqual(
  575. Subscribe(200, 1000)
  576. );
  577. }
  578. [Fact]
  579. public void ObserveOn_Scheduler_SameTime()
  580. {
  581. var scheduler = new TestScheduler();
  582. var xs = scheduler.CreateHotObservable(
  583. OnNext(210, 1),
  584. OnNext(210, 2)
  585. );
  586. var results = scheduler.Start(() =>
  587. xs.ObserveOn(scheduler)
  588. );
  589. results.Messages.AssertEqual(
  590. OnNext(211, 1),
  591. OnNext(212, 2)
  592. );
  593. xs.Subscriptions.AssertEqual(
  594. Subscribe(200, 1000)
  595. );
  596. }
  597. [Fact]
  598. public void ObserveOn_Scheduler_OnNextThrows()
  599. {
  600. var e = new ManualResetEvent(false);
  601. var scheduler = new MyScheduler(e);
  602. Observable.Range(0, 10, Scheduler.Default).ObserveOn(scheduler).Subscribe(
  603. x =>
  604. {
  605. if (x == 5)
  606. throw new Exception();
  607. }
  608. );
  609. e.WaitOne();
  610. Assert.NotNull(scheduler._exception);
  611. }
  612. class MyScheduler : IScheduler
  613. {
  614. internal Exception _exception;
  615. private ManualResetEvent _evt;
  616. public MyScheduler(ManualResetEvent e)
  617. {
  618. _evt = e;
  619. }
  620. public DateTimeOffset Now
  621. {
  622. get { throw new NotImplementedException(); }
  623. }
  624. public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  625. {
  626. try
  627. {
  628. return action(this, state);
  629. }
  630. catch (Exception ex)
  631. {
  632. _exception = ex;
  633. _evt.Set();
  634. return Disposable.Empty;
  635. }
  636. }
  637. public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  638. {
  639. throw new NotImplementedException();
  640. }
  641. public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  642. {
  643. throw new NotImplementedException();
  644. }
  645. }
  646. #if !NO_PERF && !NO_CDS
  647. [Fact]
  648. public void ObserveOn_LongRunning_Simple()
  649. {
  650. var started = default(ManualResetEvent);
  651. var stopped = default(ManualResetEvent);
  652. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  653. var s = new Subject<int>();
  654. var end = new ManualResetEvent(false);
  655. var lst = new List<int>();
  656. s.ObserveOn(scheduler).Subscribe(lst.Add, () => end.Set());
  657. s.OnNext(1);
  658. s.OnNext(2);
  659. s.OnNext(3);
  660. s.OnCompleted();
  661. end.WaitOne();
  662. Assert.True(lst.SequenceEqual(new[] { 1, 2, 3 }));
  663. }
  664. [Fact]
  665. public void ObserveOn_LongRunning_Error()
  666. {
  667. var started = default(ManualResetEvent);
  668. var stopped = default(ManualResetEvent);
  669. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  670. var s = new Subject<int>();
  671. var end = new ManualResetEvent(false);
  672. var err = default(Exception);
  673. s.ObserveOn(scheduler).Subscribe(_ => { }, ex => { err = ex; end.Set(); });
  674. s.OnNext(1);
  675. s.OnNext(2);
  676. s.OnNext(3);
  677. var ex_ = new Exception();
  678. s.OnError(ex_);
  679. end.WaitOne();
  680. Assert.Same(ex_, err);
  681. }
  682. #if !NO_THREAD
  683. [Fact]
  684. public void ObserveOn_LongRunning_TimeVariance()
  685. {
  686. var started = default(ManualResetEvent);
  687. var stopped = default(ManualResetEvent);
  688. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  689. var s = new Subject<int>();
  690. var end = new ManualResetEvent(false);
  691. s.ObserveOn(scheduler).Subscribe(_ => { }, () => end.Set());
  692. s.OnNext(1); // Ensure active
  693. started.WaitOne();
  694. Thread.Sleep(100); // Try to enter the dispatcher event wait state
  695. for (int i = 0; i < 1000; i++)
  696. {
  697. if (i % 100 == 0)
  698. Thread.Sleep(10);
  699. s.OnNext(i);
  700. }
  701. s.OnCompleted();
  702. end.WaitOne();
  703. }
  704. #endif
  705. [Fact]
  706. public void ObserveOn_LongRunning_HoldUpDuringDispatchAndFail()
  707. {
  708. var started = default(ManualResetEvent);
  709. var stopped = default(ManualResetEvent);
  710. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  711. var s = new Subject<int>();
  712. var onNext = new ManualResetEvent(false);
  713. var resume = new ManualResetEvent(false);
  714. var lst = new List<int>();
  715. var err = default(Exception);
  716. var end = new ManualResetEvent(false);
  717. s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); onNext.Set(); resume.WaitOne(); }, ex_ => { err = ex_; end.Set(); });
  718. s.OnNext(1);
  719. onNext.WaitOne();
  720. s.OnNext(2);
  721. s.OnNext(3);
  722. var ex = new Exception();
  723. s.OnError(ex);
  724. resume.Set();
  725. end.WaitOne();
  726. Assert.True(lst.SequenceEqual(new[] { 1, 2, 3 }));
  727. Assert.Same(ex, err);
  728. }
  729. [Fact]
  730. public void ObserveOn_LongRunning_Cancel()
  731. {
  732. var started = default(ManualResetEvent);
  733. var stopped = default(ManualResetEvent);
  734. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  735. var s = new Subject<int>();
  736. var lst = new List<int>();
  737. var end = new ManualResetEvent(false);
  738. var running = new ManualResetEvent(false);
  739. var d = s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); running.Set(); });
  740. s.OnNext(0);
  741. started.WaitOne();
  742. s.OnNext(1);
  743. s.OnNext(2);
  744. s.OnNext(3);
  745. running.WaitOne();
  746. d.Dispose();
  747. stopped.WaitOne();
  748. s.OnNext(4);
  749. Assert.True(lst.Count > 0 && !lst.Contains(4));
  750. }
  751. [Fact]
  752. public void ObserveOn_LongRunning_OnNextThrows()
  753. {
  754. var started = default(ManualResetEvent);
  755. var stopped = default(ManualResetEvent);
  756. var exception = default(Exception);
  757. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e, ex => exception = ex);
  758. var s = new Subject<int>();
  759. var lst = new List<int>();
  760. var end = new ManualResetEvent(false);
  761. var running = new ManualResetEvent(false);
  762. var d = s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); running.Set(); if (x == 3) throw new Exception(); });
  763. s.OnNext(0);
  764. started.WaitOne();
  765. s.OnNext(1);
  766. s.OnNext(2);
  767. s.OnNext(3);
  768. running.WaitOne();
  769. s.OnNext(4);
  770. stopped.WaitOne();
  771. Assert.NotNull(exception);
  772. }
  773. #endif
  774. #if !NO_SYNCCTX
  775. [Fact]
  776. public void ObserveOn_SynchronizationContext_Simple()
  777. {
  778. var scheduler = new TestScheduler();
  779. var xs = scheduler.CreateHotObservable(
  780. OnNext(90, 1),
  781. OnNext(120, 2),
  782. OnNext(230, 3),
  783. OnNext(240, 4),
  784. OnNext(310, 5),
  785. OnNext(470, 6),
  786. OnCompleted<int>(530)
  787. );
  788. var results = scheduler.Start(() =>
  789. xs.ObserveOn(new MyCtx(scheduler))
  790. );
  791. results.Messages.AssertEqual(
  792. OnNext(231, 3),
  793. OnNext(241, 4),
  794. OnNext(311, 5),
  795. OnNext(471, 6),
  796. OnCompleted<int>(531)
  797. );
  798. xs.Subscriptions.AssertEqual(
  799. Subscribe(200, 531)
  800. );
  801. }
  802. #endif
  803. #endregion
  804. #region SubscribeOn
  805. [Fact]
  806. public void SubscribeOn_Scheduler_ArgumentChecking()
  807. {
  808. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn(default(IObservable<int>), DummyScheduler.Instance));
  809. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn(DummyObservable<int>.Instance, default(IScheduler)));
  810. }
  811. [Fact]
  812. public void SubscribeOn_Scheduler_Sleep()
  813. {
  814. var scheduler = new TestScheduler();
  815. var s = 0L;
  816. var d = 0L;
  817. var xs = Observable.Create<long>(observer =>
  818. {
  819. s = scheduler.Clock;
  820. return () => d = scheduler.Clock;
  821. });
  822. var results = scheduler.Start(() =>
  823. xs.SubscribeOn(scheduler)
  824. );
  825. results.Messages.AssertEqual(
  826. );
  827. Assert.Equal(201, s);
  828. Assert.Equal(1001, d);
  829. }
  830. [Fact]
  831. public void SubscribeOn_Scheduler_Completed()
  832. {
  833. var scheduler = new TestScheduler();
  834. var xs = scheduler.CreateHotObservable(
  835. OnCompleted<long>(300)
  836. );
  837. var results = scheduler.Start(() =>
  838. xs.SubscribeOn(scheduler)
  839. );
  840. results.Messages.AssertEqual(
  841. OnCompleted<long>(300)
  842. );
  843. xs.Subscriptions.AssertEqual(
  844. Subscribe(201, 301)
  845. );
  846. }
  847. [Fact]
  848. public void SubscribeOn_Scheduler_Error()
  849. {
  850. var scheduler = new TestScheduler();
  851. var ex = new Exception();
  852. var xs = scheduler.CreateHotObservable(
  853. OnError<int>(300, ex)
  854. );
  855. var results = scheduler.Start(() =>
  856. xs.SubscribeOn(scheduler)
  857. );
  858. results.Messages.AssertEqual(
  859. OnError<int>(300, ex)
  860. );
  861. xs.Subscriptions.AssertEqual(
  862. Subscribe(201, 301)
  863. );
  864. }
  865. [Fact]
  866. public void SubscribeOn_Scheduler_Dispose()
  867. {
  868. var scheduler = new TestScheduler();
  869. var xs = scheduler.CreateHotObservable<int>(
  870. );
  871. var results = scheduler.Start(() =>
  872. xs.SubscribeOn(scheduler)
  873. );
  874. results.Messages.AssertEqual(
  875. );
  876. xs.Subscriptions.AssertEqual(
  877. Subscribe(201, 1001)
  878. );
  879. }
  880. #if !NO_SYNCCTX
  881. [Fact]
  882. public void SubscribeOn_SynchronizationContext_Simple()
  883. {
  884. var scheduler = new TestScheduler();
  885. var xs = scheduler.CreateHotObservable(
  886. OnNext(90, 1),
  887. OnNext(120, 2),
  888. OnNext(230, 3),
  889. OnNext(240, 4),
  890. OnNext(310, 5),
  891. OnNext(470, 6),
  892. OnCompleted<int>(530)
  893. );
  894. var results = scheduler.Start(() =>
  895. xs.SubscribeOn(new MyCtx(scheduler))
  896. );
  897. results.Messages.AssertEqual(
  898. OnNext(230, 3),
  899. OnNext(240, 4),
  900. OnNext(310, 5),
  901. OnNext(470, 6),
  902. OnCompleted<int>(530)
  903. );
  904. xs.Subscriptions.AssertEqual(
  905. Subscribe(201, 531)
  906. );
  907. }
  908. #endif
  909. #endregion
  910. #region |> Helpers <|
  911. #if !NO_SYNCCTX
  912. class MyCtx : SynchronizationContext
  913. {
  914. private IScheduler scheduler;
  915. public MyCtx(IScheduler scheduler)
  916. {
  917. this.scheduler = scheduler;
  918. }
  919. public override void Post(SendOrPostCallback d, object state)
  920. {
  921. scheduler.Schedule(state, (self, s) => { d(s); return Disposable.Empty; });
  922. }
  923. }
  924. #endif
  925. #endregion
  926. }
  927. }