Synchronization.cs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.ComponentModel;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive.Concurrency
  9. {
  10. /// <summary>
  11. /// Provides basic synchronization and scheduling services for observable sequences.
  12. /// </summary>
  13. [EditorBrowsable(EditorBrowsableState.Advanced)]
  14. public static class Synchronization
  15. {
  16. #region SubscribeOn
  17. /// <summary>
  18. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
  19. /// </summary>
  20. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  21. /// <param name="source">Source sequence.</param>
  22. /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>
  23. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>
  24. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  25. /// <remarks>
  26. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
  27. /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.
  28. /// </remarks>
  29. public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
  30. {
  31. if (source == null)
  32. throw new ArgumentNullException("source");
  33. if (scheduler == null)
  34. throw new ArgumentNullException("scheduler");
  35. return new AnonymousObservable<TSource>(observer =>
  36. {
  37. var m = new SingleAssignmentDisposable();
  38. var d = new SerialDisposable();
  39. d.Disposable = m;
  40. m.Disposable = scheduler.Schedule(() =>
  41. {
  42. d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
  43. });
  44. return d;
  45. });
  46. }
  47. #if !NO_SYNCCTX
  48. /// <summary>
  49. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
  50. /// </summary>
  51. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  52. /// <param name="source">Source sequence.</param>
  53. /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>
  54. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>
  55. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
  56. /// <remarks>
  57. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context.
  58. /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
  59. /// </remarks>
  60. public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
  61. {
  62. if (source == null)
  63. throw new ArgumentNullException("source");
  64. if (context == null)
  65. throw new ArgumentNullException("context");
  66. return new AnonymousObservable<TSource>(observer =>
  67. {
  68. var subscription = new SingleAssignmentDisposable();
  69. context.PostWithStartComplete(() =>
  70. {
  71. if (!subscription.IsDisposed)
  72. subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
  73. });
  74. return subscription;
  75. });
  76. }
  77. #endif
  78. #endregion
  79. #region ObserveOn
  80. /// <summary>
  81. /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
  82. /// </summary>
  83. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  84. /// <param name="source">Source sequence.</param>
  85. /// <param name="scheduler">Scheduler to notify observers on.</param>
  86. /// <returns>The source sequence whose observations happen on the specified scheduler.</returns>
  87. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  88. public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
  89. {
  90. if (source == null)
  91. throw new ArgumentNullException("source");
  92. if (scheduler == null)
  93. throw new ArgumentNullException("scheduler");
  94. #if !NO_PERF
  95. return new ObserveOn<TSource>(source, scheduler);
  96. #else
  97. return new AnonymousObservable<TSource>(observer => source.Subscribe(new ObserveOnObserver<TSource>(scheduler, observer, null)));
  98. #endif
  99. }
  100. #if !NO_SYNCCTX
  101. /// <summary>
  102. /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
  103. /// </summary>
  104. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  105. /// <param name="source">Source sequence.</param>
  106. /// <param name="context">Synchronization context to notify observers on.</param>
  107. /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>
  108. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
  109. public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
  110. {
  111. if (source == null)
  112. throw new ArgumentNullException("source");
  113. if (context == null)
  114. throw new ArgumentNullException("context");
  115. #if !NO_PERF
  116. return new ObserveOn<TSource>(source, context);
  117. #else
  118. return new AnonymousObservable<TSource>(observer =>
  119. {
  120. context.OperationStarted();
  121. return source.Subscribe(
  122. x => context.Post(_ =>
  123. {
  124. observer.OnNext(x);
  125. }, null),
  126. exception => context.Post(_ =>
  127. {
  128. observer.OnError(exception);
  129. }, null),
  130. () => context.Post(_ =>
  131. {
  132. observer.OnCompleted();
  133. }, null)
  134. ).Finally(() =>
  135. {
  136. context.OperationCompleted();
  137. });
  138. });
  139. #endif
  140. }
  141. #endif
  142. #endregion
  143. #region Synchronize
  144. /// <summary>
  145. /// Wraps the source sequence in order to ensure observer callbacks are properly serialized.
  146. /// </summary>
  147. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  148. /// <param name="source">Source sequence.</param>
  149. /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>
  150. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  151. public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source)
  152. {
  153. if (source == null)
  154. throw new ArgumentNullException("source");
  155. #if !NO_PERF
  156. return new Synchronize<TSource>(source);
  157. #else
  158. return new AnonymousObservable<TSource>(observer =>
  159. {
  160. var gate = new object();
  161. return source.Subscribe(Observer.Synchronize(observer, gate));
  162. });
  163. #endif
  164. }
  165. /// <summary>
  166. /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
  167. /// </summary>
  168. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  169. /// <param name="source">Source sequence.</param>
  170. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  171. /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
  172. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>
  173. public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate)
  174. {
  175. if (source == null)
  176. throw new ArgumentNullException("source");
  177. if (gate == null)
  178. throw new ArgumentNullException("gate");
  179. #if !NO_PERF
  180. return new Synchronize<TSource>(source, gate);
  181. #else
  182. return new AnonymousObservable<TSource>(observer =>
  183. {
  184. return source.Subscribe(Observer.Synchronize(observer, gate));
  185. });
  186. #endif
  187. }
  188. #endregion
  189. }
  190. }