SubscribeOnTest.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Linq;
  7. using System.Threading;
  8. using Microsoft.Reactive.Testing;
  9. using ReactiveTests.Dummies;
  10. using Xunit;
  11. #if HAS_DISPATCHER
  12. using System.Windows.Threading;
  13. using System.Reactive;
  14. using System.Reactive.Subjects;
  15. #endif
  16. #if HAS_WINFORMS
  17. using System.Windows.Forms;
  18. #endif
  19. namespace ReactiveTests.Tests
  20. {
  21. public class SubscribeOnTest : TestBase
  22. {
  23. #region + TestBase +
  24. [Fact]
  25. public void SubscribeOn_ArgumentChecking()
  26. {
  27. var someObservable = Observable.Empty<int>();
  28. #if HAS_WINFORMS
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new ControlScheduler(new Label())));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(someObservable, default(ControlScheduler)));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.SubscribeOn<int>(default(IObservable<int>), new Label()));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => ControlObservable.SubscribeOn<int>(someObservable, default(Label)));
  33. #endif
  34. #if HAS_DISPATCHER
  35. #if USE_SL_DISPATCHER
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new DispatcherScheduler(System.Windows.Deployment.Current.Dispatcher)));
  37. #else
  38. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default(IObservable<int>), new DispatcherScheduler(Dispatcher.CurrentDispatcher)));
  39. #endif
  40. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(someObservable, default(DispatcherScheduler)));
  41. #if USE_SL_DISPATCHER
  42. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOn<int>(default(IObservable<int>), System.Windows.Deployment.Current.Dispatcher));
  43. #else
  44. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOn<int>(default(IObservable<int>), Dispatcher.CurrentDispatcher));
  45. #endif
  46. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOn<int>(someObservable, default(Dispatcher)));
  47. ReactiveAssert.Throws<ArgumentNullException>(() => DispatcherObservable.SubscribeOnDispatcher<int>(default(IObservable<int>)));
  48. #endif
  49. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn<int>(default, new SynchronizationContext()));
  50. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn(someObservable, default(SynchronizationContext)));
  51. }
  52. #if HAS_WINFORMS
  53. [Fact]
  54. public void SubscribeOn_Control()
  55. {
  56. var lbl = CreateLabel();
  57. var evt2 = new ManualResetEvent(false);
  58. var evt = new ManualResetEvent(false);
  59. bool okay = true;
  60. var d = Observable.Create<int>(obs =>
  61. {
  62. lbl.Text = "Subscribe";
  63. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  64. evt2.Set();
  65. return () =>
  66. {
  67. lbl.Text = "Unsubscribe";
  68. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  69. evt.Set();
  70. };
  71. })
  72. .SubscribeOn(lbl)
  73. .Subscribe(_ => {});
  74. evt2.WaitOne();
  75. d.Dispose();
  76. evt.WaitOne();
  77. Application.Exit();
  78. Assert.True(okay);
  79. }
  80. [Fact]
  81. public void SubscribeOn_ControlScheduler()
  82. {
  83. var lbl = CreateLabel();
  84. var evt2 = new ManualResetEvent(false);
  85. var evt = new ManualResetEvent(false);
  86. bool okay = true;
  87. var d = Observable.Create<int>(obs =>
  88. {
  89. lbl.Text = "Subscribe";
  90. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  91. evt2.Set();
  92. return () =>
  93. {
  94. lbl.Text = "Unsubscribe";
  95. okay &= (SynchronizationContext.Current is System.Windows.Forms.WindowsFormsSynchronizationContext);
  96. evt.Set();
  97. };
  98. })
  99. .SubscribeOn(new ControlScheduler(lbl))
  100. .Subscribe(_ => { });
  101. evt2.WaitOne();
  102. d.Dispose();
  103. evt.WaitOne();
  104. Application.Exit();
  105. Assert.True(okay);
  106. }
  107. private Label CreateLabel()
  108. {
  109. var loaded = new ManualResetEvent(false);
  110. var lbl = default(Label);
  111. var t = new Thread(() =>
  112. {
  113. lbl = new Label();
  114. var frm = new Form { Controls = { lbl }, Width = 0, Height = 0, FormBorderStyle = FormBorderStyle.None, ShowInTaskbar = false };
  115. frm.Load += (_, __) =>
  116. {
  117. loaded.Set();
  118. };
  119. Application.Run(frm);
  120. });
  121. t.SetApartmentState(ApartmentState.STA);
  122. t.Start();
  123. loaded.WaitOne();
  124. return lbl;
  125. }
  126. #endif
  127. #if HAS_DISPATCHER
  128. [Fact]
  129. [Asynchronous]
  130. public void SubscribeOn_Dispatcher()
  131. {
  132. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  133. RunAsync(evt =>
  134. {
  135. var s = new AsyncSubject<Unit>();
  136. bool okay = true;
  137. var d = Observable.Create<int>(obs =>
  138. {
  139. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  140. s.OnNext(Unit.Default);
  141. s.OnCompleted();
  142. return () =>
  143. {
  144. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  145. Assert.True(okay);
  146. dispatcher.InvokeShutdown();
  147. evt.Set();
  148. };
  149. })
  150. .SubscribeOn(dispatcher)
  151. .Subscribe(_ => { });
  152. s.Subscribe(_ => d.Dispose());
  153. });
  154. }
  155. [Fact]
  156. [Asynchronous]
  157. public void SubscribeOn_DispatcherScheduler()
  158. {
  159. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  160. RunAsync(evt =>
  161. {
  162. var s = new AsyncSubject<Unit>();
  163. bool okay = true;
  164. var d = Observable.Create<int>(obs =>
  165. {
  166. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  167. s.OnNext(Unit.Default);
  168. s.OnCompleted();
  169. return () =>
  170. {
  171. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  172. Assert.True(okay);
  173. dispatcher.InvokeShutdown();
  174. evt.Set();
  175. };
  176. })
  177. .SubscribeOn(new DispatcherScheduler(dispatcher))
  178. .Subscribe(_ => { });
  179. s.Subscribe(_ => d.Dispose());
  180. });
  181. }
  182. [Fact]
  183. [Asynchronous]
  184. public void SubscribeOn_CurrentDispatcher()
  185. {
  186. var dispatcher = DispatcherHelpers.EnsureDispatcher();
  187. RunAsync(evt =>
  188. {
  189. var s = new AsyncSubject<Unit>();
  190. bool okay = true;
  191. dispatcher.BeginInvoke(new Action(() =>
  192. {
  193. var d = Observable.Create<int>(obs =>
  194. {
  195. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  196. s.OnNext(Unit.Default);
  197. s.OnCompleted();
  198. return () =>
  199. {
  200. okay &= (SynchronizationContext.Current is System.Windows.Threading.DispatcherSynchronizationContext);
  201. Assert.True(okay);
  202. dispatcher.InvokeShutdown();
  203. evt.Set();
  204. };
  205. })
  206. .SubscribeOnDispatcher()
  207. .Subscribe(_ => { });
  208. s.Subscribe(_ => d.Dispose());
  209. }));
  210. });
  211. }
  212. #endif
  213. #endregion + TestBase +
  214. }
  215. public class SubscribeOnReactiveTest : ReactiveTest
  216. {
  217. [Fact]
  218. public void SubscribeOn_Scheduler_ArgumentChecking()
  219. {
  220. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn(default(IObservable<int>), DummyScheduler.Instance));
  221. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SubscribeOn(DummyObservable<int>.Instance, default(IScheduler)));
  222. }
  223. [Fact]
  224. public void SubscribeOn_Scheduler_Sleep()
  225. {
  226. var scheduler = new TestScheduler();
  227. var s = 0L;
  228. var d = 0L;
  229. var xs = Observable.Create<long>(observer =>
  230. {
  231. s = scheduler.Clock;
  232. return () => d = scheduler.Clock;
  233. });
  234. var results = scheduler.Start(() =>
  235. xs.SubscribeOn(scheduler)
  236. );
  237. results.Messages.AssertEqual(
  238. );
  239. Assert.Equal(201, s);
  240. Assert.Equal(1001, d);
  241. }
  242. [Fact]
  243. public void SubscribeOn_Scheduler_Completed()
  244. {
  245. var scheduler = new TestScheduler();
  246. var xs = scheduler.CreateHotObservable(
  247. OnCompleted<long>(300)
  248. );
  249. var results = scheduler.Start(() =>
  250. xs.SubscribeOn(scheduler)
  251. );
  252. results.Messages.AssertEqual(
  253. OnCompleted<long>(300)
  254. );
  255. xs.Subscriptions.AssertEqual(
  256. Subscribe(201, 301)
  257. );
  258. }
  259. [Fact]
  260. public void SubscribeOn_Scheduler_Error()
  261. {
  262. var scheduler = new TestScheduler();
  263. var ex = new Exception();
  264. var xs = scheduler.CreateHotObservable(
  265. OnError<int>(300, ex)
  266. );
  267. var results = scheduler.Start(() =>
  268. xs.SubscribeOn(scheduler)
  269. );
  270. results.Messages.AssertEqual(
  271. OnError<int>(300, ex)
  272. );
  273. xs.Subscriptions.AssertEqual(
  274. Subscribe(201, 301)
  275. );
  276. }
  277. [Fact]
  278. public void SubscribeOn_Scheduler_Dispose()
  279. {
  280. var scheduler = new TestScheduler();
  281. var xs = scheduler.CreateHotObservable<int>(
  282. );
  283. var results = scheduler.Start(() =>
  284. xs.SubscribeOn(scheduler)
  285. );
  286. results.Messages.AssertEqual(
  287. );
  288. xs.Subscriptions.AssertEqual(
  289. Subscribe(201, 1001)
  290. );
  291. }
  292. [Fact]
  293. public void SubscribeOn_SynchronizationContext_Simple()
  294. {
  295. var scheduler = new TestScheduler();
  296. var xs = scheduler.CreateHotObservable(
  297. OnNext(90, 1),
  298. OnNext(120, 2),
  299. OnNext(230, 3),
  300. OnNext(240, 4),
  301. OnNext(310, 5),
  302. OnNext(470, 6),
  303. OnCompleted<int>(530)
  304. );
  305. var results = scheduler.Start(() =>
  306. xs.SubscribeOn(new MyCtx(scheduler))
  307. );
  308. results.Messages.AssertEqual(
  309. OnNext(230, 3),
  310. OnNext(240, 4),
  311. OnNext(310, 5),
  312. OnNext(470, 6),
  313. OnCompleted<int>(530)
  314. );
  315. xs.Subscriptions.AssertEqual(
  316. Subscribe(201, 531)
  317. );
  318. }
  319. }
  320. }