ObservableConcurrencyTest.cs 34 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Disposables;
  10. using System.Reactive.Linq;
  11. using System.Reactive.Subjects;
  12. using System.Threading;
  13. #if NET46
  14. using System.Windows.Threading;
  15. #endif
  16. using Microsoft.Reactive.Testing;
  17. using Xunit;
  18. using ReactiveTests.Dummies;
  19. #if HAS_WINFORMS
  20. using System.Windows.Forms;
  21. #endif
  22. namespace ReactiveTests.Tests
  23. {
  24. public partial class ObservableConcurrencyTest : TestBase
  25. {
  26. #region + ObserveOn +
  27. [Fact]
  28. public void ObserveOn_ArgumentChecking()
  29. {
  30. var someObservable = Observable.Empty<int>();
  31. #if HAS_WINFORMS
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new ControlScheduler(new Label())));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(ControlScheduler)));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.ObserveOn<int>(default(IObservable<int>), new Label()));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.ObserveOn<int>(someObservable, default(Label)));
  36. #endif
  37. #if HAS_DISPATCHER
  38. #if USE_SL_DISPATCHER
  39. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new DispatcherScheduler(System.Windows.Deployment.Current.Dispatcher)));
  40. #else
  41. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new DispatcherScheduler(Dispatcher.CurrentDispatcher)));
  42. #endif
  43. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(DispatcherScheduler)));
  44. #if USE_SL_DISPATCHER
  45. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(default(IObservable<int>), System.Windows.Deployment.Current.Dispatcher));
  46. #else
  47. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(default(IObservable<int>), Dispatcher.CurrentDispatcher));
  48. #endif
  49. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOn<int>(someObservable, default(Dispatcher)));
  50. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.ObserveOnDispatcher<int>(default(IObservable<int>)));
  51. #endif
  52. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(default(IObservable<int>), new SynchronizationContext()));
  53. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn<int>(someObservable, default(SynchronizationContext)));
  54. }
  55. #if HAS_WINFORMS
  56. [Fact]
  57. public void ObserveOn_Control()
  58. {
  59. var lbl = CreateLabel();
  60. var evt = new ManualResetEvent(false);
  61. bool okay = true;
  62. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(lbl).Subscribe(x =>
  63. {
  64. lbl.Text = x.ToString();
  65. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  66. }, () => evt.Set());
  67. evt.WaitOne();
  68. Application.Exit();
  69. Assert.True(okay);
  70. }
  71. [Fact]
  72. public void ObserveOn_ControlScheduler()
  73. {
  74. var lbl = CreateLabel();
  75. var evt = new ManualResetEvent(false);
  76. bool okay = true;
  77. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(new ControlScheduler(lbl)).Subscribe(x =>
  78. {
  79. lbl.Text = x.ToString();
  80. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  81. }, () => evt.Set());
  82. evt.WaitOne();
  83. Application.Exit();
  84. Assert.True(okay);
  85. }
  86. private Label CreateLabel()
  87. {
  88. var loaded = new ManualResetEvent(false);
  89. var lbl = default(Label);
  90. var t = new Thread(() =>
  91. {
  92. lbl = new Label();
  93. var frm = new Form { Controls = { lbl }, Width = 0, Height = 0, FormBorderStyle = FormBorderStyle.None, ShowInTaskbar = false };
  94. frm.Load += (_, __) =>
  95. {
  96. loaded.Set();
  97. };
  98. Application.Run(frm);
  99. });
  100. t.SetApartmentState(ApartmentState.STA);
  101. t.Start();
  102. loaded.WaitOne();
  103. return lbl;
  104. }
  105. #endif
  106. #if HAS_DISPATCHER
  107. [Fact]
  108. [Asynchronous]
  109. public void ObserveOn_Dispatcher()
  110. {
  111. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  112. RunAsync(evt =>
  113. {
  114. bool okay = true;
  115. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(dispatcher).Subscribe(x =>
  116. {
  117. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  118. }, () =>
  119. {
  120. Assert.True(okay);
  121. dispatcher.InvokeShutdown();
  122. evt.Set();
  123. });
  124. });
  125. }
  126. [Fact]
  127. [Asynchronous]
  128. public void ObserveOn_DispatcherScheduler()
  129. {
  130. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  131. RunAsync(evt =>
  132. {
  133. bool okay = true;
  134. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOn(new DispatcherScheduler(dispatcher)).Subscribe(x =>
  135. {
  136. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  137. }, () =>
  138. {
  139. Assert.True(okay);
  140. dispatcher.InvokeShutdown();
  141. evt.Set();
  142. });
  143. });
  144. }
  145. [Fact]
  146. [Asynchronous]
  147. public void ObserveOn_CurrentDispatcher()
  148. {
  149. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  150. RunAsync(evt =>
  151. {
  152. bool okay = true;
  153. dispatcher.BeginInvoke(new Action(() =>
  154. {
  155. Observable.Range(0, 10, NewThreadScheduler.Default).ObserveOnDispatcher().Subscribe(x =>
  156. {
  157. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  158. }, () =>
  159. {
  160. Assert.True(okay);
  161. dispatcher.InvokeShutdown();
  162. evt.Set();
  163. });
  164. }));
  165. });
  166. }
  167. [Fact]
  168. [Asynchronous]
  169. public void ObserveOn_Error()
  170. {
  171. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  172. RunAsync(evt =>
  173. {
  174. var ex = new Exception();
  175. bool okay = true;
  176. dispatcher.BeginInvoke(new Action(() =>
  177. {
  178. Observable.Throw<int>(ex).ObserveOnDispatcher().Subscribe(x =>
  179. {
  180. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  181. },
  182. e =>
  183. {
  184. Assert.True(okay);
  185. Assert.Same(ex, e);
  186. dispatcher.InvokeShutdown();
  187. evt.Set();
  188. },
  189. () =>
  190. {
  191. Assert.True(false);
  192. dispatcher.InvokeShutdown();
  193. evt.Set();
  194. });
  195. }));
  196. });
  197. }
  198. #endif
  199. #endregion
  200. #region SubscribeOn
  201. [Fact]
  202. public void SubscribeOn_ArgumentChecking()
  203. {
  204. var someObservable = Observable.Empty<int>();
  205. #if HAS_WINFORMS
  206. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new ControlScheduler(new Label())));
  207. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(someObservable, default(ControlScheduler)));
  208. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.SubscribeOn<int>(default(IObservable<int>), new Label()));
  209. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.SubscribeOn<int>(someObservable, default(Label)));
  210. #endif
  211. #if HAS_DISPATCHER
  212. #if USE_SL_DISPATCHER
  213. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new DispatcherScheduler(System.Windows.Deployment.Current.Dispatcher)));
  214. #else
  215. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new DispatcherScheduler(Dispatcher.CurrentDispatcher)));
  216. #endif
  217. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(someObservable, default(DispatcherScheduler)));
  218. #if USE_SL_DISPATCHER
  219. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOn<int>(default(IObservable<int>), System.Windows.Deployment.Current.Dispatcher));
  220. #else
  221. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOn<int>(default(IObservable<int>), Dispatcher.CurrentDispatcher));
  222. #endif
  223. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOn<int>(someObservable, default(Dispatcher)));
  224. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOnDispatcher<int>(default(IObservable<int>)));
  225. #endif
  226. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new SynchronizationContext()));
  227. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(someObservable, default(SynchronizationContext)));
  228. }
  229. #if HAS_WINFORMS
  230. [Fact]
  231. public void SubscribeOn_Control()
  232. {
  233. var lbl = CreateLabel();
  234. var evt2 = new ManualResetEvent(false);
  235. var evt = new ManualResetEvent(false);
  236. bool okay = true;
  237. var d = Observable.Create<int>(obs =>
  238. {
  239. lbl.Text = "Subscribe";
  240. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  241. evt2.Set();
  242. return () =>
  243. {
  244. lbl.Text = "Unsubscribe";
  245. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  246. evt.Set();
  247. };
  248. })
  249. .SubscribeOn(lbl)
  250. .Subscribe(_ => {});
  251. evt2.WaitOne();
  252. d.Dispose();
  253. evt.WaitOne();
  254. Application.Exit();
  255. Assert.True(okay);
  256. }
  257. [Fact]
  258. public void SubscribeOn_ControlScheduler()
  259. {
  260. var lbl = CreateLabel();
  261. var evt2 = new ManualResetEvent(false);
  262. var evt = new ManualResetEvent(false);
  263. bool okay = true;
  264. var d = Observable.Create<int>(obs =>
  265. {
  266. lbl.Text = "Subscribe";
  267. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  268. evt2.Set();
  269. return () =>
  270. {
  271. lbl.Text = "Unsubscribe";
  272. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  273. evt.Set();
  274. };
  275. })
  276. .SubscribeOn(new ControlScheduler(lbl))
  277. .Subscribe(_ => { });
  278. evt2.WaitOne();
  279. d.Dispose();
  280. evt.WaitOne();
  281. Application.Exit();
  282. Assert.True(okay);
  283. }
  284. #endif
  285. #if HAS_DISPATCHER
  286. [Fact]
  287. [Asynchronous]
  288. public void SubscribeOn_Dispatcher()
  289. {
  290. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  291. RunAsync(evt =>
  292. {
  293. var s = new AsyncSubject<Unit>();
  294. bool okay = true;
  295. var d = Observable.Create<int>(obs =>
  296. {
  297. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  298. s.OnNext(Unit.Default);
  299. s.OnCompleted();
  300. return () =>
  301. {
  302. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  303. Assert.True(okay);
  304. dispatcher.InvokeShutdown();
  305. evt.Set();
  306. };
  307. })
  308. .SubscribeOn(dispatcher)
  309. .Subscribe(_ => { });
  310. s.Subscribe(_ => d.Dispose());
  311. });
  312. }
  313. [Fact]
  314. [Asynchronous]
  315. public void SubscribeOn_DispatcherScheduler()
  316. {
  317. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  318. RunAsync(evt =>
  319. {
  320. var s = new AsyncSubject<Unit>();
  321. bool okay = true;
  322. var d = Observable.Create<int>(obs =>
  323. {
  324. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  325. s.OnNext(Unit.Default);
  326. s.OnCompleted();
  327. return () =>
  328. {
  329. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  330. Assert.True(okay);
  331. dispatcher.InvokeShutdown();
  332. evt.Set();
  333. };
  334. })
  335. .SubscribeOn(new DispatcherScheduler(dispatcher))
  336. .Subscribe(_ => { });
  337. s.Subscribe(_ => d.Dispose());
  338. });
  339. }
  340. [Fact]
  341. [Asynchronous]
  342. public void SubscribeOn_CurrentDispatcher()
  343. {
  344. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  345. RunAsync(evt =>
  346. {
  347. var s = new AsyncSubject<Unit>();
  348. bool okay = true;
  349. dispatcher.BeginInvoke(new Action(() =>
  350. {
  351. var d = Observable.Create<int>(obs =>
  352. {
  353. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  354. s.OnNext(Unit.Default);
  355. s.OnCompleted();
  356. return () =>
  357. {
  358. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  359. Assert.True(okay);
  360. dispatcher.InvokeShutdown();
  361. evt.Set();
  362. };
  363. })
  364. .SubscribeOnDispatcher()
  365. .Subscribe(_ => { });
  366. s.Subscribe(_ => d.Dispose());
  367. }));
  368. });
  369. }
  370. #endif
  371. #endregion
  372. #region + Synchronize +
  373. [Fact]
  374. public void Synchronize_ArgumentChecking()
  375. {
  376. var someObservable = Observable.Empty<int>();
  377. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default(IObservable<int>)));
  378. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(default(IObservable<int>), new object()));
  379. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Synchronize<int>(someObservable, null));
  380. }
  381. #if !NO_THREAD
  382. [Fact]
  383. public void Synchronize_Range()
  384. {
  385. int i = 0;
  386. bool outsideLock = true;
  387. var gate = new object();
  388. lock (gate)
  389. {
  390. outsideLock = false;
  391. Observable.Range(0, 100, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => i++, () => { Assert.True(outsideLock); });
  392. Thread.Sleep(100);
  393. Assert.Equal(0, i);
  394. outsideLock = true;
  395. }
  396. while (i < 100)
  397. {
  398. Thread.Sleep(10);
  399. lock (gate)
  400. {
  401. int start = i;
  402. Thread.Sleep(100);
  403. Assert.Equal(start, i);
  404. }
  405. }
  406. }
  407. [Fact]
  408. public void Synchronize_Throw()
  409. {
  410. var ex = new Exception();
  411. var resLock = new object();
  412. var e = default(Exception);
  413. bool outsideLock = true;
  414. var gate = new object();
  415. lock (gate)
  416. {
  417. outsideLock = false;
  418. Observable.Throw<int>(ex, NewThreadScheduler.Default).Synchronize(gate).Subscribe(x => { Assert.True(false); }, err => { lock (resLock) { e = err; } }, () => { Assert.True(outsideLock); });
  419. Thread.Sleep(100);
  420. Assert.Null(e);
  421. outsideLock = true;
  422. }
  423. while (true)
  424. {
  425. lock (resLock)
  426. {
  427. if (e != null)
  428. break;
  429. }
  430. }
  431. Assert.Same(ex, e);
  432. }
  433. [Fact]
  434. public void Synchronize_BadObservable()
  435. {
  436. var o = Observable.Create<int>(obs =>
  437. {
  438. var t1 = new Thread(() =>
  439. {
  440. for (int i = 0; i < 100; i++)
  441. {
  442. obs.OnNext(i);
  443. }
  444. });
  445. new Thread(() =>
  446. {
  447. t1.Start();
  448. for (int i = 100; i < 200; i++)
  449. {
  450. obs.OnNext(i);
  451. }
  452. t1.Join();
  453. obs.OnCompleted();
  454. }).Start();
  455. return () => { };
  456. });
  457. var evt = new ManualResetEvent(false);
  458. int sum = 0;
  459. o.Synchronize().Subscribe(x => sum += x, () => { evt.Set(); });
  460. evt.WaitOne();
  461. Assert.Equal(Enumerable.Range(0, 200).Sum(), sum);
  462. }
  463. #endif
  464. #endregion
  465. }
  466. public class ObservableConcurrencyReactiveTest : ReactiveTest
  467. {
  468. #region + ObserveOn +
  469. [Fact]
  470. public void ObserveOn_Scheduler_ArgumentChecking()
  471. {
  472. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn(default(IObservable<int>), DummyScheduler.Instance));
  473. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ObserveOn(DummyObservable<int>.Instance, default(IScheduler)));
  474. }
  475. [Fact]
  476. public void ObserveOn_Scheduler_Completed()
  477. {
  478. var scheduler = new TestScheduler();
  479. var xs = scheduler.CreateHotObservable(
  480. OnNext(90, 1),
  481. OnNext(120, 2),
  482. OnNext(230, 3),
  483. OnNext(240, 4),
  484. OnNext(310, 5),
  485. OnNext(470, 6),
  486. OnCompleted<int>(530)
  487. );
  488. var results = scheduler.Start(() =>
  489. xs.ObserveOn(scheduler)
  490. );
  491. results.Messages.AssertEqual(
  492. OnNext(231, 3),
  493. OnNext(241, 4),
  494. OnNext(311, 5),
  495. OnNext(471, 6),
  496. OnCompleted<int>(531)
  497. );
  498. #if !NO_PERF
  499. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  500. xs.Subscriptions.AssertEqual(
  501. Subscribe(200, 531)
  502. );
  503. #else
  504. //
  505. // TODO: Check platform discrepancies
  506. //
  507. //xs.Subscriptions.AssertEqual(
  508. // Subscribe(200, 1000)
  509. //);
  510. #endif
  511. }
  512. [Fact]
  513. public void ObserveOn_Scheduler_Error()
  514. {
  515. var scheduler = new TestScheduler();
  516. var ex = new Exception();
  517. var xs = scheduler.CreateHotObservable(
  518. OnNext(90, 1),
  519. OnNext(120, 2),
  520. OnNext(230, 3),
  521. OnNext(240, 4),
  522. OnNext(310, 5),
  523. OnNext(470, 6),
  524. OnError<int>(530, ex)
  525. );
  526. var results = scheduler.Start(() =>
  527. xs.ObserveOn(scheduler)
  528. );
  529. results.Messages.AssertEqual(
  530. OnNext(231, 3),
  531. OnNext(241, 4),
  532. OnNext(311, 5),
  533. OnNext(471, 6),
  534. OnError<int>(531, ex)
  535. );
  536. #if !NO_PERF
  537. // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  538. xs.Subscriptions.AssertEqual(
  539. Subscribe(200, 531)
  540. );
  541. #else
  542. //
  543. // TODO: Check platform discrepancies
  544. //
  545. //xs.Subscriptions.AssertEqual(
  546. // Subscribe(200, 1000)
  547. //);
  548. #endif
  549. }
  550. [Fact]
  551. public void ObserveOn_Scheduler_Dispose()
  552. {
  553. var scheduler = new TestScheduler();
  554. var xs = scheduler.CreateHotObservable(
  555. OnNext(90, 1),
  556. OnNext(120, 2),
  557. OnNext(230, 3),
  558. OnNext(240, 4),
  559. OnNext(310, 5),
  560. OnNext(470, 6)
  561. );
  562. var results = scheduler.Start(() =>
  563. xs.ObserveOn(scheduler)
  564. );
  565. results.Messages.AssertEqual(
  566. OnNext(231, 3),
  567. OnNext(241, 4),
  568. OnNext(311, 5),
  569. OnNext(471, 6)
  570. );
  571. xs.Subscriptions.AssertEqual(
  572. Subscribe(200, 1000)
  573. );
  574. }
  575. [Fact]
  576. public void ObserveOn_Scheduler_SameTime()
  577. {
  578. var scheduler = new TestScheduler();
  579. var xs = scheduler.CreateHotObservable(
  580. OnNext(210, 1),
  581. OnNext(210, 2)
  582. );
  583. var results = scheduler.Start(() =>
  584. xs.ObserveOn(scheduler)
  585. );
  586. results.Messages.AssertEqual(
  587. OnNext(211, 1),
  588. OnNext(212, 2)
  589. );
  590. xs.Subscriptions.AssertEqual(
  591. Subscribe(200, 1000)
  592. );
  593. }
  594. [Fact]
  595. public void ObserveOn_Scheduler_OnNextThrows()
  596. {
  597. var e = new ManualResetEvent(false);
  598. var scheduler = new MyScheduler(e);
  599. Observable.Range(0, 10, Scheduler.Default).ObserveOn(scheduler).Subscribe(
  600. x =>
  601. {
  602. if (x == 5)
  603. throw new Exception();
  604. }
  605. );
  606. e.WaitOne();
  607. Assert.NotNull(scheduler._exception);
  608. }
  609. class MyScheduler : IScheduler
  610. {
  611. internal Exception _exception;
  612. private ManualResetEvent _evt;
  613. public MyScheduler(ManualResetEvent e)
  614. {
  615. _evt = e;
  616. }
  617. public DateTimeOffset Now
  618. {
  619. get { throw new NotImplementedException(); }
  620. }
  621. public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  622. {
  623. try
  624. {
  625. return action(this, state);
  626. }
  627. catch (Exception ex)
  628. {
  629. _exception = ex;
  630. _evt.Set();
  631. return Disposable.Empty;
  632. }
  633. }
  634. public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  635. {
  636. throw new NotImplementedException();
  637. }
  638. public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  639. {
  640. throw new NotImplementedException();
  641. }
  642. }
  643. #if !NO_PERF
  644. [Fact]
  645. public void ObserveOn_LongRunning_Simple()
  646. {
  647. var started = default(ManualResetEvent);
  648. var stopped = default(ManualResetEvent);
  649. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  650. var s = new Subject<int>();
  651. var end = new ManualResetEvent(false);
  652. var lst = new List<int>();
  653. s.ObserveOn(scheduler).Subscribe(lst.Add, () => end.Set());
  654. s.OnNext(1);
  655. s.OnNext(2);
  656. s.OnNext(3);
  657. s.OnCompleted();
  658. end.WaitOne();
  659. Assert.True(lst.SequenceEqual(new[] { 1, 2, 3 }));
  660. }
  661. [Fact]
  662. public void ObserveOn_LongRunning_Error()
  663. {
  664. var started = default(ManualResetEvent);
  665. var stopped = default(ManualResetEvent);
  666. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  667. var s = new Subject<int>();
  668. var end = new ManualResetEvent(false);
  669. var err = default(Exception);
  670. s.ObserveOn(scheduler).Subscribe(_ => { }, ex => { err = ex; end.Set(); });
  671. s.OnNext(1);
  672. s.OnNext(2);
  673. s.OnNext(3);
  674. var ex_ = new Exception();
  675. s.OnError(ex_);
  676. end.WaitOne();
  677. Assert.Same(ex_, err);
  678. }
  679. #if !NO_THREAD
  680. [Fact]
  681. public void ObserveOn_LongRunning_TimeVariance()
  682. {
  683. var started = default(ManualResetEvent);
  684. var stopped = default(ManualResetEvent);
  685. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  686. var s = new Subject<int>();
  687. var end = new ManualResetEvent(false);
  688. s.ObserveOn(scheduler).Subscribe(_ => { }, () => end.Set());
  689. s.OnNext(1); // Ensure active
  690. started.WaitOne();
  691. Thread.Sleep(100); // Try to enter the dispatcher event wait state
  692. for (int i = 0; i < 1000; i++)
  693. {
  694. if (i % 100 == 0)
  695. Thread.Sleep(10);
  696. s.OnNext(i);
  697. }
  698. s.OnCompleted();
  699. end.WaitOne();
  700. }
  701. #endif
  702. [Fact]
  703. public void ObserveOn_LongRunning_HoldUpDuringDispatchAndFail()
  704. {
  705. var started = default(ManualResetEvent);
  706. var stopped = default(ManualResetEvent);
  707. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  708. var s = new Subject<int>();
  709. var onNext = new ManualResetEvent(false);
  710. var resume = new ManualResetEvent(false);
  711. var lst = new List<int>();
  712. var err = default(Exception);
  713. var end = new ManualResetEvent(false);
  714. s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); onNext.Set(); resume.WaitOne(); }, ex_ => { err = ex_; end.Set(); });
  715. s.OnNext(1);
  716. onNext.WaitOne();
  717. s.OnNext(2);
  718. s.OnNext(3);
  719. var ex = new Exception();
  720. s.OnError(ex);
  721. resume.Set();
  722. end.WaitOne();
  723. Assert.True(lst.SequenceEqual(new[] { 1, 2, 3 }));
  724. Assert.Same(ex, err);
  725. }
  726. [Fact]
  727. public void ObserveOn_LongRunning_Cancel()
  728. {
  729. var started = default(ManualResetEvent);
  730. var stopped = default(ManualResetEvent);
  731. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e);
  732. var s = new Subject<int>();
  733. var lst = new List<int>();
  734. var end = new ManualResetEvent(false);
  735. var running = new ManualResetEvent(false);
  736. var d = s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); running.Set(); });
  737. s.OnNext(0);
  738. started.WaitOne();
  739. s.OnNext(1);
  740. s.OnNext(2);
  741. s.OnNext(3);
  742. running.WaitOne();
  743. d.Dispose();
  744. stopped.WaitOne();
  745. s.OnNext(4);
  746. Assert.True(lst.Count > 0 && !lst.Contains(4));
  747. }
  748. [Fact]
  749. public void ObserveOn_LongRunning_OnNextThrows()
  750. {
  751. var started = default(ManualResetEvent);
  752. var stopped = default(ManualResetEvent);
  753. var exception = default(Exception);
  754. var scheduler = new TestLongRunningScheduler(e => started = e, e => stopped = e, ex => exception = ex);
  755. var s = new Subject<int>();
  756. var lst = new List<int>();
  757. var end = new ManualResetEvent(false);
  758. var running = new ManualResetEvent(false);
  759. var d = s.ObserveOn(scheduler).Subscribe(x => { lst.Add(x); running.Set(); if (x == 3) throw new Exception(); });
  760. s.OnNext(0);
  761. started.WaitOne();
  762. s.OnNext(1);
  763. s.OnNext(2);
  764. s.OnNext(3);
  765. running.WaitOne();
  766. s.OnNext(4);
  767. stopped.WaitOne();
  768. Assert.NotNull(exception);
  769. }
  770. #endif
  771. [Fact]
  772. public void ObserveOn_SynchronizationContext_Simple()
  773. {
  774. var scheduler = new TestScheduler();
  775. var xs = scheduler.CreateHotObservable(
  776. OnNext(90, 1),
  777. OnNext(120, 2),
  778. OnNext(230, 3),
  779. OnNext(240, 4),
  780. OnNext(310, 5),
  781. OnNext(470, 6),
  782. OnCompleted<int>(530)
  783. );
  784. var results = scheduler.Start(() =>
  785. xs.ObserveOn(new MyCtx(scheduler))
  786. );
  787. results.Messages.AssertEqual(
  788. OnNext(231, 3),
  789. OnNext(241, 4),
  790. OnNext(311, 5),
  791. OnNext(471, 6),
  792. OnCompleted<int>(531)
  793. );
  794. xs.Subscriptions.AssertEqual(
  795. Subscribe(200, 531)
  796. );
  797. }
  798. #endregion
  799. #region SubscribeOn
  800. [Fact]
  801. public void SubscribeOn_Scheduler_ArgumentChecking()
  802. {
  803. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn(default(IObservable<int>), DummyScheduler.Instance));
  804. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn(DummyObservable<int>.Instance, default(IScheduler)));
  805. }
  806. [Fact]
  807. public void SubscribeOn_Scheduler_Sleep()
  808. {
  809. var scheduler = new TestScheduler();
  810. var s = 0L;
  811. var d = 0L;
  812. var xs = Observable.Create<long>(observer =>
  813. {
  814. s = scheduler.Clock;
  815. return () => d = scheduler.Clock;
  816. });
  817. var results = scheduler.Start(() =>
  818. xs.SubscribeOn(scheduler)
  819. );
  820. results.Messages.AssertEqual(
  821. );
  822. Assert.Equal(201, s);
  823. Assert.Equal(1001, d);
  824. }
  825. [Fact]
  826. public void SubscribeOn_Scheduler_Completed()
  827. {
  828. var scheduler = new TestScheduler();
  829. var xs = scheduler.CreateHotObservable(
  830. OnCompleted<long>(300)
  831. );
  832. var results = scheduler.Start(() =>
  833. xs.SubscribeOn(scheduler)
  834. );
  835. results.Messages.AssertEqual(
  836. OnCompleted<long>(300)
  837. );
  838. xs.Subscriptions.AssertEqual(
  839. Subscribe(201, 301)
  840. );
  841. }
  842. [Fact]
  843. public void SubscribeOn_Scheduler_Error()
  844. {
  845. var scheduler = new TestScheduler();
  846. var ex = new Exception();
  847. var xs = scheduler.CreateHotObservable(
  848. OnError<int>(300, ex)
  849. );
  850. var results = scheduler.Start(() =>
  851. xs.SubscribeOn(scheduler)
  852. );
  853. results.Messages.AssertEqual(
  854. OnError<int>(300, ex)
  855. );
  856. xs.Subscriptions.AssertEqual(
  857. Subscribe(201, 301)
  858. );
  859. }
  860. [Fact]
  861. public void SubscribeOn_Scheduler_Dispose()
  862. {
  863. var scheduler = new TestScheduler();
  864. var xs = scheduler.CreateHotObservable<int>(
  865. );
  866. var results = scheduler.Start(() =>
  867. xs.SubscribeOn(scheduler)
  868. );
  869. results.Messages.AssertEqual(
  870. );
  871. xs.Subscriptions.AssertEqual(
  872. Subscribe(201, 1001)
  873. );
  874. }
  875. [Fact]
  876. public void SubscribeOn_SynchronizationContext_Simple()
  877. {
  878. var scheduler = new TestScheduler();
  879. var xs = scheduler.CreateHotObservable(
  880. OnNext(90, 1),
  881. OnNext(120, 2),
  882. OnNext(230, 3),
  883. OnNext(240, 4),
  884. OnNext(310, 5),
  885. OnNext(470, 6),
  886. OnCompleted<int>(530)
  887. );
  888. var results = scheduler.Start(() =>
  889. xs.SubscribeOn(new MyCtx(scheduler))
  890. );
  891. results.Messages.AssertEqual(
  892. OnNext(230, 3),
  893. OnNext(240, 4),
  894. OnNext(310, 5),
  895. OnNext(470, 6),
  896. OnCompleted<int>(530)
  897. );
  898. xs.Subscriptions.AssertEqual(
  899. Subscribe(201, 531)
  900. );
  901. }
  902. #endregion
  903. #region |> Helpers <|
  904. class MyCtx : SynchronizationContext
  905. {
  906. private IScheduler scheduler;
  907. public MyCtx(IScheduler scheduler)
  908. {
  909. this.scheduler = scheduler;
  910. }
  911. public override void Post(SendOrPostCallback d, object state)
  912. {
  913. scheduler.Schedule(state, (self, s) => { d(s); return Disposable.Empty; });
  914. }
  915. }
  916. #endregion
  917. }
  918. }