Synchronization.cs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.ComponentModel;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive.Concurrency
  8. {
  9. /// <summary>
  10. /// Provides basic synchronization and scheduling services for observable sequences.
  11. /// </summary>
  12. [EditorBrowsable(EditorBrowsableState.Advanced)]
  13. public static class Synchronization
  14. {
  15. #region SubscribeOn
  16. /// <summary>
  17. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
  18. /// </summary>
  19. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  20. /// <param name="source">Source sequence.</param>
  21. /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>
  22. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>
  23. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  24. /// <remarks>
  25. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
  26. /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.
  27. /// </remarks>
  28. public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
  29. {
  30. if (source == null)
  31. throw new ArgumentNullException(nameof(source));
  32. if (scheduler == null)
  33. throw new ArgumentNullException(nameof(scheduler));
  34. return new AnonymousObservable<TSource>(observer =>
  35. {
  36. var m = new SingleAssignmentDisposable();
  37. var d = new SerialDisposable();
  38. d.Disposable = m;
  39. m.Disposable = scheduler.Schedule(() =>
  40. {
  41. d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
  42. });
  43. return d;
  44. });
  45. }
  46. #if !NO_SYNCCTX
  47. /// <summary>
  48. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
  49. /// </summary>
  50. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  51. /// <param name="source">Source sequence.</param>
  52. /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>
  53. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>
  54. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
  55. /// <remarks>
  56. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context.
  57. /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
  58. /// </remarks>
  59. public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
  60. {
  61. if (source == null)
  62. throw new ArgumentNullException(nameof(source));
  63. if (context == null)
  64. throw new ArgumentNullException(nameof(context));
  65. return new AnonymousObservable<TSource>(observer =>
  66. {
  67. var subscription = new SingleAssignmentDisposable();
  68. context.PostWithStartComplete(() =>
  69. {
  70. if (!subscription.IsDisposed)
  71. subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
  72. });
  73. return subscription;
  74. });
  75. }
  76. #endif
  77. #endregion
  78. #region ObserveOn
  79. /// <summary>
  80. /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
  81. /// </summary>
  82. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  83. /// <param name="source">Source sequence.</param>
  84. /// <param name="scheduler">Scheduler to notify observers on.</param>
  85. /// <returns>The source sequence whose observations happen on the specified scheduler.</returns>
  86. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  87. public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
  88. {
  89. if (source == null)
  90. throw new ArgumentNullException(nameof(source));
  91. if (scheduler == null)
  92. throw new ArgumentNullException(nameof(scheduler));
  93. #if !NO_PERF
  94. return new ObserveOn<TSource>(source, scheduler);
  95. #else
  96. return new AnonymousObservable<TSource>(observer => source.Subscribe(new ObserveOnObserver<TSource>(scheduler, observer, null)));
  97. #endif
  98. }
  99. #if !NO_SYNCCTX
  100. /// <summary>
  101. /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
  102. /// </summary>
  103. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  104. /// <param name="source">Source sequence.</param>
  105. /// <param name="context">Synchronization context to notify observers on.</param>
  106. /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>
  107. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
  108. public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
  109. {
  110. if (source == null)
  111. throw new ArgumentNullException(nameof(source));
  112. if (context == null)
  113. throw new ArgumentNullException(nameof(context));
  114. #if !NO_PERF
  115. return new ObserveOn<TSource>(source, context);
  116. #else
  117. return new AnonymousObservable<TSource>(observer =>
  118. {
  119. context.OperationStarted();
  120. return source.Subscribe(
  121. x => context.Post(_ =>
  122. {
  123. observer.OnNext(x);
  124. }, null),
  125. exception => context.Post(_ =>
  126. {
  127. observer.OnError(exception);
  128. }, null),
  129. () => context.Post(_ =>
  130. {
  131. observer.OnCompleted();
  132. }, null)
  133. ).Finally(() =>
  134. {
  135. context.OperationCompleted();
  136. });
  137. });
  138. #endif
  139. }
  140. #endif
  141. #endregion
  142. #region Synchronize
  143. /// <summary>
  144. /// Wraps the source sequence in order to ensure observer callbacks are properly serialized.
  145. /// </summary>
  146. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  147. /// <param name="source">Source sequence.</param>
  148. /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>
  149. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  150. public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source)
  151. {
  152. if (source == null)
  153. throw new ArgumentNullException(nameof(source));
  154. #if !NO_PERF
  155. return new Synchronize<TSource>(source);
  156. #else
  157. return new AnonymousObservable<TSource>(observer =>
  158. {
  159. var gate = new object();
  160. return source.Subscribe(Observer.Synchronize(observer, gate));
  161. });
  162. #endif
  163. }
  164. /// <summary>
  165. /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
  166. /// </summary>
  167. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  168. /// <param name="source">Source sequence.</param>
  169. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  170. /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
  171. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>
  172. public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate)
  173. {
  174. if (source == null)
  175. throw new ArgumentNullException(nameof(source));
  176. if (gate == null)
  177. throw new ArgumentNullException(nameof(gate));
  178. #if !NO_PERF
  179. return new Synchronize<TSource>(source, gate);
  180. #else
  181. return new AnonymousObservable<TSource>(observer =>
  182. {
  183. return source.Subscribe(Observer.Synchronize(observer, gate));
  184. });
  185. #endif
  186. }
  187. #endregion
  188. }
  189. }