1
0

DispatcherObservable.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !WINDOWS
  3. using System.Reactive.Concurrency;
  4. using System.Windows;
  5. using System.Windows.Threading;
  6. namespace System.Reactive.Linq
  7. {
  8. /// <summary>
  9. /// Provides a set of extension methods for scheduling actions performed through observable sequences on UI dispatchers.
  10. /// </summary>
  11. public static class DispatcherObservable
  12. {
  13. #region ObserveOn[Dispatcher]
  14. /// <summary>
  15. /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher.
  16. /// </summary>
  17. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  18. /// <param name="source">Source sequence.</param>
  19. /// <param name="dispatcher">Dispatcher whose associated message loop is used to to notify observers on.</param>
  20. /// <returns>The source sequence whose observations happen on the specified dispatcher.</returns>
  21. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception>
  22. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher)
  23. {
  24. if (source == null)
  25. throw new ArgumentNullException("source");
  26. if (dispatcher == null)
  27. throw new ArgumentNullException("dispatcher");
  28. return ObserveOn_<TSource>(source, dispatcher);
  29. }
  30. #if HAS_DISPATCHER_PRIORITY
  31. /// <summary>
  32. /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher.
  33. /// </summary>
  34. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  35. /// <param name="source">Source sequence.</param>
  36. /// <param name="dispatcher">Dispatcher whose associated message loop is used to to notify observers on.</param>
  37. /// <param name="priority">Priority to schedule work items at.</param>
  38. /// <returns>The source sequence whose observations happen on the specified dispatcher.</returns>
  39. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception>
  40. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority)
  41. {
  42. if (source == null)
  43. throw new ArgumentNullException("source");
  44. if (dispatcher == null)
  45. throw new ArgumentNullException("dispatcher");
  46. return ObserveOn_<TSource>(source, dispatcher, priority);
  47. }
  48. #endif
  49. /// <summary>
  50. /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher scheduler.
  51. /// </summary>
  52. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  53. /// <param name="source">Source sequence.</param>
  54. /// <param name="scheduler">Dispatcher scheduler to notify observers on.</param>
  55. /// <returns>The source sequence whose observations happen on the specified dispatcher scheduler.</returns>
  56. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  57. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DispatcherScheduler scheduler)
  58. {
  59. if (source == null)
  60. throw new ArgumentNullException("source");
  61. if (scheduler == null)
  62. throw new ArgumentNullException("scheduler");
  63. #if HAS_DISPATCHER_PRIORITY
  64. return ObserveOn_<TSource>(source, scheduler.Dispatcher, scheduler.Priority);
  65. #else
  66. return ObserveOn_<TSource>(source, scheduler.Dispatcher);
  67. #endif
  68. }
  69. #if USE_SL_DISPATCHER
  70. /// <summary>
  71. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object.
  72. /// </summary>
  73. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  74. /// <param name="source">Source sequence.</param>
  75. /// <param name="dependencyObject">Object to get the dispatcher from.</param>
  76. /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns>
  77. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception>
  78. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject)
  79. {
  80. if (source == null)
  81. throw new ArgumentNullException("source");
  82. if (dependencyObject == null)
  83. throw new ArgumentNullException("dependencyObject");
  84. return ObserveOn_<TSource>(source, dependencyObject.Dispatcher);
  85. }
  86. #else
  87. /// <summary>
  88. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object.
  89. /// </summary>
  90. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  91. /// <param name="source">Source sequence.</param>
  92. /// <param name="dispatcherObject">Object to get the dispatcher from.</param>
  93. /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns>
  94. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception>
  95. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject)
  96. {
  97. if (source == null)
  98. throw new ArgumentNullException("source");
  99. if (dispatcherObject == null)
  100. throw new ArgumentNullException("dispatcherObject");
  101. return ObserveOn_<TSource>(source, dispatcherObject.Dispatcher);
  102. }
  103. #endif
  104. #if HAS_DISPATCHER_PRIORITY
  105. /// <summary>
  106. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object.
  107. /// </summary>
  108. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  109. /// <param name="source">Source sequence.</param>
  110. /// <param name="dispatcherObject">Object to get the dispatcher from.</param>
  111. /// <param name="priority">Priority to schedule work items at.</param>
  112. /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns>
  113. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception>
  114. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject, DispatcherPriority priority)
  115. {
  116. if (source == null)
  117. throw new ArgumentNullException("source");
  118. if (dispatcherObject == null)
  119. throw new ArgumentNullException("dispatcherObject");
  120. return ObserveOn_<TSource>(source, dispatcherObject.Dispatcher, priority);
  121. }
  122. #endif
  123. /// <summary>
  124. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the current thread.
  125. /// </summary>
  126. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  127. /// <param name="source">Source sequence.</param>
  128. /// <returns>The source sequence whose observations happen on the current thread's dispatcher.</returns>
  129. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  130. public static IObservable<TSource> ObserveOnDispatcher<TSource>(this IObservable<TSource> source)
  131. {
  132. if (source == null)
  133. throw new ArgumentNullException("source");
  134. #if USE_SL_DISPATCHER
  135. return ObserveOn_<TSource>(source, System.Windows.Deployment.Current.Dispatcher);
  136. #else
  137. return ObserveOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher);
  138. #endif
  139. }
  140. #if HAS_DISPATCHER_PRIORITY
  141. /// <summary>
  142. /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the current thread.
  143. /// </summary>
  144. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  145. /// <param name="source">Source sequence.</param>
  146. /// <param name="priority">Priority to schedule work items at.</param>
  147. /// <returns>The source sequence whose observations happen on the current thread's dispatcher.</returns>
  148. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  149. public static IObservable<TSource> ObserveOnDispatcher<TSource>(this IObservable<TSource> source, DispatcherPriority priority)
  150. {
  151. if (source == null)
  152. throw new ArgumentNullException("source");
  153. return ObserveOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher, priority);
  154. }
  155. private static IObservable<TSource> ObserveOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority)
  156. {
  157. return Synchronization.ObserveOn(source, new DispatcherSynchronizationContext(dispatcher, priority));
  158. }
  159. #endif
  160. private static IObservable<TSource> ObserveOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher)
  161. {
  162. return Synchronization.ObserveOn(source, new DispatcherSynchronizationContext(dispatcher));
  163. }
  164. #endregion
  165. #region SubscribeOn[Dispatcher]
  166. /// <summary>
  167. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher.
  168. /// </summary>
  169. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  170. /// <param name="source">Source sequence.</param>
  171. /// <param name="dispatcher">Dispatcher whose associated message loop is used to to perform subscription and unsubscription actions on.</param>
  172. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher.</returns>
  173. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception>
  174. /// <remarks>
  175. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified dispatcher.
  176. /// In order to invoke observer callbacks on the specified dispatcher, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, Dispatcher)"/>.
  177. /// </remarks>
  178. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher)
  179. {
  180. if (source == null)
  181. throw new ArgumentNullException("source");
  182. if (dispatcher == null)
  183. throw new ArgumentNullException("dispatcher");
  184. return SubscribeOn_<TSource>(source, dispatcher);
  185. }
  186. #if HAS_DISPATCHER_PRIORITY
  187. /// <summary>
  188. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher.
  189. /// </summary>
  190. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  191. /// <param name="source">Source sequence.</param>
  192. /// <param name="dispatcher">Dispatcher whose associated message loop is used to to perform subscription and unsubscription actions on.</param>
  193. /// <param name="priority">Priority to schedule work items at.</param>
  194. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher.</returns>
  195. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception>
  196. /// <remarks>
  197. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified dispatcher.
  198. /// In order to invoke observer callbacks on the specified dispatcher, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, Dispatcher, DispatcherPriority)"/>.
  199. /// </remarks>
  200. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority)
  201. {
  202. if (source == null)
  203. throw new ArgumentNullException("source");
  204. if (dispatcher == null)
  205. throw new ArgumentNullException("dispatcher");
  206. return SubscribeOn_<TSource>(source, dispatcher, priority);
  207. }
  208. #endif
  209. /// <summary>
  210. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher scheduler.
  211. /// </summary>
  212. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  213. /// <param name="source">Source sequence.</param>
  214. /// <param name="scheduler">Dispatcher scheduler to perform subscription and unsubscription actions on.</param>
  215. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher scheduler.</returns>
  216. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  217. /// <remarks>
  218. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
  219. /// In order to invoke observer callbacks on the specified scheduler, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DispatcherScheduler)"/>.
  220. /// </remarks>
  221. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DispatcherScheduler scheduler)
  222. {
  223. if (source == null)
  224. throw new ArgumentNullException("source");
  225. if (scheduler == null)
  226. throw new ArgumentNullException("scheduler");
  227. #if HAS_DISPATCHER_PRIORITY
  228. return SubscribeOn_<TSource>(source, scheduler.Dispatcher, scheduler.Priority);
  229. #else
  230. return SubscribeOn_<TSource>(source, scheduler.Dispatcher);
  231. #endif
  232. }
  233. #if USE_SL_DISPATCHER
  234. /// <summary>
  235. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object.
  236. /// </summary>
  237. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  238. /// <param name="source">Source sequence.</param>
  239. /// <param name="dependencyObject">Object to get the dispatcher from.</param>
  240. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns>
  241. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception>
  242. /// <remarks>
  243. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object.
  244. /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DependencyObject)"/>.
  245. /// </remarks>
  246. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject)
  247. {
  248. if (source == null)
  249. throw new ArgumentNullException("source");
  250. if (dependencyObject == null)
  251. throw new ArgumentNullException("dependencyObject");
  252. return SubscribeOn_<TSource>(source, dependencyObject.Dispatcher);
  253. }
  254. #else
  255. /// <summary>
  256. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object.
  257. /// </summary>
  258. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  259. /// <param name="source">Source sequence.</param>
  260. /// <param name="dispatcherObject">Object to get the dispatcher from.</param>
  261. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns>
  262. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception>
  263. /// <remarks>
  264. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object.
  265. /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DispatcherObject)"/>.
  266. /// </remarks>
  267. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject)
  268. {
  269. if (source == null)
  270. throw new ArgumentNullException("source");
  271. if (dispatcherObject == null)
  272. throw new ArgumentNullException("dispatcherObject");
  273. return SubscribeOn_<TSource>(source, dispatcherObject.Dispatcher);
  274. }
  275. #endif
  276. #if HAS_DISPATCHER_PRIORITY
  277. /// <summary>
  278. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object.
  279. /// </summary>
  280. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  281. /// <param name="source">Source sequence.</param>
  282. /// <param name="dispatcherObject">Object to get the dispatcher from.</param>
  283. /// <param name="priority">Priority to schedule work items at.</param>
  284. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns>
  285. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception>
  286. /// <remarks>
  287. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object.
  288. /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DispatcherObject, DispatcherPriority)"/>.
  289. /// </remarks>
  290. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject, DispatcherPriority priority)
  291. {
  292. if (source == null)
  293. throw new ArgumentNullException("source");
  294. if (dispatcherObject == null)
  295. throw new ArgumentNullException("dispatcherObject");
  296. return SubscribeOn_<TSource>(source, dispatcherObject.Dispatcher, priority);
  297. }
  298. #endif
  299. /// <summary>
  300. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the current thread.
  301. /// </summary>
  302. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  303. /// <param name="source">Source sequence.</param>
  304. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the current thread's dispatcher.</returns>
  305. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  306. /// <remarks>
  307. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the current thread.
  308. /// In order to invoke observer callbacks on the dispatcher associated with the current thread, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOnDispatcher{TSource}(IObservable{TSource})"/>.
  309. /// </remarks>
  310. public static IObservable<TSource> SubscribeOnDispatcher<TSource>(this IObservable<TSource> source)
  311. {
  312. if (source == null)
  313. throw new ArgumentNullException("source");
  314. #if USE_SL_DISPATCHER
  315. return SubscribeOn_<TSource>(source, System.Windows.Deployment.Current.Dispatcher);
  316. #else
  317. return SubscribeOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher);
  318. #endif
  319. }
  320. #if HAS_DISPATCHER_PRIORITY
  321. /// <summary>
  322. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the current thread.
  323. /// </summary>
  324. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  325. /// <param name="source">Source sequence.</param>
  326. /// <param name="priority">Priority to schedule work items at.</param>
  327. /// <returns>The source sequence whose observations happen on the current thread's dispatcher.</returns>
  328. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  329. /// <remarks>
  330. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the current thread.
  331. /// In order to invoke observer callbacks on the dispatcher associated with the current thread, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOnDispatcher{TSource}(IObservable{TSource}, DispatcherPriority)"/>.
  332. /// </remarks>
  333. public static IObservable<TSource> SubscribeOnDispatcher<TSource>(this IObservable<TSource> source, DispatcherPriority priority)
  334. {
  335. if (source == null)
  336. throw new ArgumentNullException("source");
  337. return SubscribeOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher, priority);
  338. }
  339. private static IObservable<TSource> SubscribeOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority)
  340. {
  341. return Synchronization.SubscribeOn(source, new DispatcherSynchronizationContext(dispatcher, priority));
  342. }
  343. #endif
  344. private static IObservable<TSource> SubscribeOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher)
  345. {
  346. return Synchronization.SubscribeOn(source, new DispatcherSynchronizationContext(dispatcher));
  347. }
  348. #endregion
  349. }
  350. }
  351. #endif