DispatcherObservable.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !WINDOWS
  5. using System.Reactive.Concurrency;
  6. using System.Windows;
  7. using System.Windows.Threading;
  8. namespace System.Reactive.Linq
  9. {
  10. /// <summary>
  11. /// Provides a set of extension methods for scheduling actions performed through observable sequences on UI dispatchers.
  12. /// </summary>
  13. public static class DispatcherObservable
  14. {
  15. #region ObserveOn[Dispatcher]
  16. /// <summary>
  17. /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher.
  18. /// </summary>
  19. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  20. /// <param name="source">Source sequence.</param>
  21. /// <param name="dispatcher">Dispatcher whose associated message loop is used to to notify observers on.</param>
  22. /// <returns>The source sequence whose observations happen on the specified dispatcher.</returns>
  23. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception>
  24. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher)
  25. {
  26. if (source == null)
  27. throw new ArgumentNullException(nameof(source));
  28. if (dispatcher == null)
  29. throw new ArgumentNullException(nameof(dispatcher));
  30. return ObserveOn_<TSource>(source, dispatcher);
  31. }
  32. #if HAS_DISPATCHER_PRIORITY
  33. /// <summary>
  34. /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher.
  35. /// </summary>
  36. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  37. /// <param name="source">Source sequence.</param>
  38. /// <param name="dispatcher">Dispatcher whose associated message loop is used to to notify observers on.</param>
  39. /// <param name="priority">Priority to schedule work items at.</param>
  40. /// <returns>The source sequence whose observations happen on the specified dispatcher.</returns>
  41. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception>
  42. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority)
  43. {
  44. if (source == null)
  45. throw new ArgumentNullException(nameof(source));
  46. if (dispatcher == null)
  47. throw new ArgumentNullException(nameof(dispatcher));
  48. return ObserveOn_<TSource>(source, dispatcher, priority);
  49. }
  50. #endif
  51. /// <summary>
  52. /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher scheduler.
  53. /// </summary>
  54. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  55. /// <param name="source">Source sequence.</param>
  56. /// <param name="scheduler">Dispatcher scheduler to notify observers on.</param>
  57. /// <returns>The source sequence whose observations happen on the specified dispatcher scheduler.</returns>
  58. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  59. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DispatcherScheduler scheduler)
  60. {
  61. if (source == null)
  62. throw new ArgumentNullException(nameof(source));
  63. if (scheduler == null)
  64. throw new ArgumentNullException(nameof(scheduler));
  65. #if HAS_DISPATCHER_PRIORITY
  66. return ObserveOn_<TSource>(source, scheduler.Dispatcher, scheduler.Priority);
  67. #else
  68. return ObserveOn_<TSource>(source, scheduler.Dispatcher);
  69. #endif
  70. }
  71. #if USE_SL_DISPATCHER
  72. /// <summary>
  73. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object.
  74. /// </summary>
  75. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  76. /// <param name="source">Source sequence.</param>
  77. /// <param name="dependencyObject">Object to get the dispatcher from.</param>
  78. /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns>
  79. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception>
  80. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject)
  81. {
  82. if (source == null)
  83. throw new ArgumentNullException("source");
  84. if (dependencyObject == null)
  85. throw new ArgumentNullException("dependencyObject");
  86. return ObserveOn_<TSource>(source, dependencyObject.Dispatcher);
  87. }
  88. #else
  89. /// <summary>
  90. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object.
  91. /// </summary>
  92. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  93. /// <param name="source">Source sequence.</param>
  94. /// <param name="dispatcherObject">Object to get the dispatcher from.</param>
  95. /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns>
  96. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception>
  97. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject)
  98. {
  99. if (source == null)
  100. throw new ArgumentNullException(nameof(source));
  101. if (dispatcherObject == null)
  102. throw new ArgumentNullException(nameof(dispatcherObject));
  103. return ObserveOn_<TSource>(source, dispatcherObject.Dispatcher);
  104. }
  105. #endif
  106. #if HAS_DISPATCHER_PRIORITY
  107. /// <summary>
  108. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object.
  109. /// </summary>
  110. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  111. /// <param name="source">Source sequence.</param>
  112. /// <param name="dispatcherObject">Object to get the dispatcher from.</param>
  113. /// <param name="priority">Priority to schedule work items at.</param>
  114. /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns>
  115. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception>
  116. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject, DispatcherPriority priority)
  117. {
  118. if (source == null)
  119. throw new ArgumentNullException(nameof(source));
  120. if (dispatcherObject == null)
  121. throw new ArgumentNullException(nameof(dispatcherObject));
  122. return ObserveOn_<TSource>(source, dispatcherObject.Dispatcher, priority);
  123. }
  124. #endif
  125. /// <summary>
  126. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the current thread.
  127. /// </summary>
  128. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  129. /// <param name="source">Source sequence.</param>
  130. /// <returns>The source sequence whose observations happen on the current thread's dispatcher.</returns>
  131. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  132. public static IObservable<TSource> ObserveOnDispatcher<TSource>(this IObservable<TSource> source)
  133. {
  134. if (source == null)
  135. throw new ArgumentNullException(nameof(source));
  136. #if USE_SL_DISPATCHER
  137. return ObserveOn_<TSource>(source, System.Windows.Deployment.Current.Dispatcher);
  138. #else
  139. return ObserveOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher);
  140. #endif
  141. }
  142. #if HAS_DISPATCHER_PRIORITY
  143. /// <summary>
  144. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the current thread.
  145. /// </summary>
  146. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  147. /// <param name="source">Source sequence.</param>
  148. /// <param name="priority">Priority to schedule work items at.</param>
  149. /// <returns>The source sequence whose observations happen on the current thread's dispatcher.</returns>
  150. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  151. public static IObservable<TSource> ObserveOnDispatcher<TSource>(this IObservable<TSource> source, DispatcherPriority priority)
  152. {
  153. if (source == null)
  154. throw new ArgumentNullException(nameof(source));
  155. return ObserveOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher, priority);
  156. }
  157. private static IObservable<TSource> ObserveOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority)
  158. {
  159. return Synchronization.ObserveOn(source, new DispatcherSynchronizationContext(dispatcher, priority));
  160. }
  161. #endif
  162. private static IObservable<TSource> ObserveOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher)
  163. {
  164. return Synchronization.ObserveOn(source, new DispatcherSynchronizationContext(dispatcher));
  165. }
  166. #endregion
  167. #region SubscribeOn[Dispatcher]
  168. /// <summary>
  169. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher.
  170. /// </summary>
  171. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  172. /// <param name="source">Source sequence.</param>
  173. /// <param name="dispatcher">Dispatcher whose associated message loop is used to to perform subscription and unsubscription actions on.</param>
  174. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher.</returns>
  175. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception>
  176. /// <remarks>
  177. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified dispatcher.
  178. /// In order to invoke observer callbacks on the specified dispatcher, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, Dispatcher)"/>.
  179. /// </remarks>
  180. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher)
  181. {
  182. if (source == null)
  183. throw new ArgumentNullException(nameof(source));
  184. if (dispatcher == null)
  185. throw new ArgumentNullException(nameof(dispatcher));
  186. return SubscribeOn_<TSource>(source, dispatcher);
  187. }
  188. #if HAS_DISPATCHER_PRIORITY
  189. /// <summary>
  190. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher.
  191. /// </summary>
  192. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  193. /// <param name="source">Source sequence.</param>
  194. /// <param name="dispatcher">Dispatcher whose associated message loop is used to to perform subscription and unsubscription actions on.</param>
  195. /// <param name="priority">Priority to schedule work items at.</param>
  196. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher.</returns>
  197. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception>
  198. /// <remarks>
  199. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified dispatcher.
  200. /// In order to invoke observer callbacks on the specified dispatcher, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, Dispatcher, DispatcherPriority)"/>.
  201. /// </remarks>
  202. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority)
  203. {
  204. if (source == null)
  205. throw new ArgumentNullException(nameof(source));
  206. if (dispatcher == null)
  207. throw new ArgumentNullException(nameof(dispatcher));
  208. return SubscribeOn_<TSource>(source, dispatcher, priority);
  209. }
  210. #endif
  211. /// <summary>
  212. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher scheduler.
  213. /// </summary>
  214. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  215. /// <param name="source">Source sequence.</param>
  216. /// <param name="scheduler">Dispatcher scheduler to perform subscription and unsubscription actions on.</param>
  217. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher scheduler.</returns>
  218. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  219. /// <remarks>
  220. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
  221. /// In order to invoke observer callbacks on the specified scheduler, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DispatcherScheduler)"/>.
  222. /// </remarks>
  223. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DispatcherScheduler scheduler)
  224. {
  225. if (source == null)
  226. throw new ArgumentNullException(nameof(source));
  227. if (scheduler == null)
  228. throw new ArgumentNullException(nameof(scheduler));
  229. #if HAS_DISPATCHER_PRIORITY
  230. return SubscribeOn_<TSource>(source, scheduler.Dispatcher, scheduler.Priority);
  231. #else
  232. return SubscribeOn_<TSource>(source, scheduler.Dispatcher);
  233. #endif
  234. }
  235. #if USE_SL_DISPATCHER
  236. /// <summary>
  237. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object.
  238. /// </summary>
  239. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  240. /// <param name="source">Source sequence.</param>
  241. /// <param name="dependencyObject">Object to get the dispatcher from.</param>
  242. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns>
  243. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception>
  244. /// <remarks>
  245. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object.
  246. /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DependencyObject)"/>.
  247. /// </remarks>
  248. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject)
  249. {
  250. if (source == null)
  251. throw new ArgumentNullException("source");
  252. if (dependencyObject == null)
  253. throw new ArgumentNullException("dependencyObject");
  254. return SubscribeOn_<TSource>(source, dependencyObject.Dispatcher);
  255. }
  256. #else
  257. /// <summary>
  258. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object.
  259. /// </summary>
  260. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  261. /// <param name="source">Source sequence.</param>
  262. /// <param name="dispatcherObject">Object to get the dispatcher from.</param>
  263. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns>
  264. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception>
  265. /// <remarks>
  266. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object.
  267. /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DispatcherObject)"/>.
  268. /// </remarks>
  269. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject)
  270. {
  271. if (source == null)
  272. throw new ArgumentNullException(nameof(source));
  273. if (dispatcherObject == null)
  274. throw new ArgumentNullException(nameof(dispatcherObject));
  275. return SubscribeOn_<TSource>(source, dispatcherObject.Dispatcher);
  276. }
  277. #endif
  278. #if HAS_DISPATCHER_PRIORITY
  279. /// <summary>
  280. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object.
  281. /// </summary>
  282. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  283. /// <param name="source">Source sequence.</param>
  284. /// <param name="dispatcherObject">Object to get the dispatcher from.</param>
  285. /// <param name="priority">Priority to schedule work items at.</param>
  286. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns>
  287. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception>
  288. /// <remarks>
  289. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object.
  290. /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DispatcherObject, DispatcherPriority)"/>.
  291. /// </remarks>
  292. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject, DispatcherPriority priority)
  293. {
  294. if (source == null)
  295. throw new ArgumentNullException(nameof(source));
  296. if (dispatcherObject == null)
  297. throw new ArgumentNullException(nameof(dispatcherObject));
  298. return SubscribeOn_<TSource>(source, dispatcherObject.Dispatcher, priority);
  299. }
  300. #endif
  301. /// <summary>
  302. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the current thread.
  303. /// </summary>
  304. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  305. /// <param name="source">Source sequence.</param>
  306. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the current thread's dispatcher.</returns>
  307. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  308. /// <remarks>
  309. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the current thread.
  310. /// In order to invoke observer callbacks on the dispatcher associated with the current thread, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOnDispatcher{TSource}(IObservable{TSource})"/>.
  311. /// </remarks>
  312. public static IObservable<TSource> SubscribeOnDispatcher<TSource>(this IObservable<TSource> source)
  313. {
  314. if (source == null)
  315. throw new ArgumentNullException(nameof(source));
  316. #if USE_SL_DISPATCHER
  317. return SubscribeOn_<TSource>(source, System.Windows.Deployment.Current.Dispatcher);
  318. #else
  319. return SubscribeOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher);
  320. #endif
  321. }
  322. #if HAS_DISPATCHER_PRIORITY
  323. /// <summary>
  324. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the current thread.
  325. /// </summary>
  326. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  327. /// <param name="source">Source sequence.</param>
  328. /// <param name="priority">Priority to schedule work items at.</param>
  329. /// <returns>The source sequence whose observations happen on the current thread's dispatcher.</returns>
  330. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  331. /// <remarks>
  332. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the current thread.
  333. /// In order to invoke observer callbacks on the dispatcher associated with the current thread, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOnDispatcher{TSource}(IObservable{TSource}, DispatcherPriority)"/>.
  334. /// </remarks>
  335. public static IObservable<TSource> SubscribeOnDispatcher<TSource>(this IObservable<TSource> source, DispatcherPriority priority)
  336. {
  337. if (source == null)
  338. throw new ArgumentNullException(nameof(source));
  339. return SubscribeOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher, priority);
  340. }
  341. private static IObservable<TSource> SubscribeOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority)
  342. {
  343. return Synchronization.SubscribeOn(source, new DispatcherSynchronizationContext(dispatcher, priority));
  344. }
  345. #endif
  346. private static IObservable<TSource> SubscribeOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher)
  347. {
  348. return Synchronization.SubscribeOn(source, new DispatcherSynchronizationContext(dispatcher));
  349. }
  350. #endregion
  351. }
  352. }
  353. #endif