Synchronization.cs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.ComponentModel;
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. namespace System.Reactive.Concurrency
  7. {
  8. /// <summary>
  9. /// Provides basic synchronization and scheduling services for observable sequences.
  10. /// </summary>
  11. [EditorBrowsable(EditorBrowsableState.Advanced)]
  12. public static class Synchronization
  13. {
  14. #region SubscribeOn
  15. /// <summary>
  16. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
  17. /// </summary>
  18. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  19. /// <param name="source">Source sequence.</param>
  20. /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>
  21. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>
  22. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  23. /// <remarks>
  24. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
  25. /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.
  26. /// </remarks>
  27. public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
  28. {
  29. if (source == null)
  30. throw new ArgumentNullException("source");
  31. if (scheduler == null)
  32. throw new ArgumentNullException("scheduler");
  33. return new AnonymousObservable<TSource>(observer =>
  34. {
  35. var m = new SingleAssignmentDisposable();
  36. var d = new SerialDisposable();
  37. d.Disposable = m;
  38. m.Disposable = scheduler.Schedule(() =>
  39. {
  40. d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
  41. });
  42. return d;
  43. });
  44. }
  45. #if !NO_SYNCCTX
  46. /// <summary>
  47. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
  48. /// </summary>
  49. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  50. /// <param name="source">Source sequence.</param>
  51. /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>
  52. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>
  53. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
  54. /// <remarks>
  55. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context.
  56. /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
  57. /// </remarks>
  58. public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
  59. {
  60. if (source == null)
  61. throw new ArgumentNullException("source");
  62. if (context == null)
  63. throw new ArgumentNullException("context");
  64. return new AnonymousObservable<TSource>(observer =>
  65. {
  66. var subscription = new SingleAssignmentDisposable();
  67. context.PostWithStartComplete(() =>
  68. {
  69. if (!subscription.IsDisposed)
  70. subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
  71. });
  72. return subscription;
  73. });
  74. }
  75. #endif
  76. #endregion
  77. #region ObserveOn
  78. /// <summary>
  79. /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
  80. /// </summary>
  81. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  82. /// <param name="source">Source sequence.</param>
  83. /// <param name="scheduler">Scheduler to notify observers on.</param>
  84. /// <returns>The source sequence whose observations happen on the specified scheduler.</returns>
  85. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  86. public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
  87. {
  88. if (source == null)
  89. throw new ArgumentNullException("source");
  90. if (scheduler == null)
  91. throw new ArgumentNullException("scheduler");
  92. #if !NO_PERF
  93. return new ObserveOn<TSource>(source, scheduler);
  94. #else
  95. return new AnonymousObservable<TSource>(observer => source.Subscribe(new ObserveOnObserver<TSource>(scheduler, observer, null)));
  96. #endif
  97. }
  98. #if !NO_SYNCCTX
  99. /// <summary>
  100. /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
  101. /// </summary>
  102. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  103. /// <param name="source">Source sequence.</param>
  104. /// <param name="context">Synchronization context to notify observers on.</param>
  105. /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>
  106. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
  107. public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
  108. {
  109. if (source == null)
  110. throw new ArgumentNullException("source");
  111. if (context == null)
  112. throw new ArgumentNullException("context");
  113. #if !NO_PERF
  114. return new ObserveOn<TSource>(source, context);
  115. #else
  116. return new AnonymousObservable<TSource>(observer =>
  117. {
  118. context.OperationStarted();
  119. return source.Subscribe(
  120. x => context.Post(_ =>
  121. {
  122. observer.OnNext(x);
  123. }, null),
  124. exception => context.Post(_ =>
  125. {
  126. observer.OnError(exception);
  127. }, null),
  128. () => context.Post(_ =>
  129. {
  130. observer.OnCompleted();
  131. }, null)
  132. ).Finally(() =>
  133. {
  134. context.OperationCompleted();
  135. });
  136. });
  137. #endif
  138. }
  139. #endif
  140. #endregion
  141. #region Synchronize
  142. /// <summary>
  143. /// Wraps the source sequence in order to ensure observer callbacks are properly serialized.
  144. /// </summary>
  145. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  146. /// <param name="source">Source sequence.</param>
  147. /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>
  148. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  149. public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source)
  150. {
  151. if (source == null)
  152. throw new ArgumentNullException("source");
  153. #if !NO_PERF
  154. return new Synchronize<TSource>(source);
  155. #else
  156. return new AnonymousObservable<TSource>(observer =>
  157. {
  158. var gate = new object();
  159. return source.Subscribe(Observer.Synchronize(observer, gate));
  160. });
  161. #endif
  162. }
  163. /// <summary>
  164. /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
  165. /// </summary>
  166. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  167. /// <param name="source">Source sequence.</param>
  168. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  169. /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
  170. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>
  171. public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate)
  172. {
  173. if (source == null)
  174. throw new ArgumentNullException("source");
  175. if (gate == null)
  176. throw new ArgumentNullException("gate");
  177. #if !NO_PERF
  178. return new Synchronize<TSource>(source, gate);
  179. #else
  180. return new AnonymousObservable<TSource>(observer =>
  181. {
  182. return source.Subscribe(Observer.Synchronize(observer, gate));
  183. });
  184. #endif
  185. }
  186. #endregion
  187. }
  188. }