1
0

Synchronization.cs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.ComponentModel;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive.Concurrency
  8. {
  9. /// <summary>
  10. /// Provides basic synchronization and scheduling services for observable sequences.
  11. /// </summary>
  12. [EditorBrowsable(EditorBrowsableState.Advanced)]
  13. public static class Synchronization
  14. {
  15. #region SubscribeOn
  16. /// <summary>
  17. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
  18. /// </summary>
  19. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  20. /// <param name="source">Source sequence.</param>
  21. /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>
  22. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>
  23. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  24. /// <remarks>
  25. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
  26. /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.
  27. /// </remarks>
  28. public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
  29. {
  30. if (source == null)
  31. throw new ArgumentNullException(nameof(source));
  32. if (scheduler == null)
  33. throw new ArgumentNullException(nameof(scheduler));
  34. return new SubscribeOnObservable<TSource>(source, scheduler);
  35. }
  36. sealed class SubscribeOnObservable<TSource> : ObservableBase<TSource>
  37. {
  38. readonly IObservable<TSource> source;
  39. readonly IScheduler scheduler;
  40. public SubscribeOnObservable(IObservable<TSource> source, IScheduler scheduler)
  41. {
  42. this.source = source;
  43. this.scheduler = scheduler;
  44. }
  45. protected override IDisposable SubscribeCore(IObserver<TSource> observer)
  46. {
  47. var m = new SingleAssignmentDisposable();
  48. var d = new SerialDisposable();
  49. d.Disposable = m;
  50. m.Disposable = scheduler.Schedule((source, observer, d),
  51. (scheduler, state) =>
  52. {
  53. state.d.Disposable = new ScheduledDisposable(scheduler, state.source.SubscribeSafe(state.observer));
  54. return Disposable.Empty;
  55. });
  56. return d;
  57. }
  58. }
  59. /// <summary>
  60. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
  61. /// </summary>
  62. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  63. /// <param name="source">Source sequence.</param>
  64. /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>
  65. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>
  66. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is <c>null</c>.</exception>
  67. /// <remarks>
  68. /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context.
  69. /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
  70. /// </remarks>
  71. public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
  72. {
  73. if (source == null)
  74. throw new ArgumentNullException(nameof(source));
  75. if (context == null)
  76. throw new ArgumentNullException(nameof(context));
  77. return new SubscribeOnCtxObservable<TSource>(source, context);
  78. }
  79. sealed class SubscribeOnCtxObservable<TSource> : ObservableBase<TSource>
  80. {
  81. readonly IObservable<TSource> source;
  82. readonly SynchronizationContext context;
  83. public SubscribeOnCtxObservable(IObservable<TSource> source, SynchronizationContext context)
  84. {
  85. this.source = source;
  86. this.context = context;
  87. }
  88. protected override IDisposable SubscribeCore(IObserver<TSource> observer)
  89. {
  90. var subscription = new SingleAssignmentDisposable();
  91. context.PostWithStartComplete(() =>
  92. {
  93. if (!subscription.IsDisposed)
  94. {
  95. subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
  96. }
  97. });
  98. return subscription;
  99. }
  100. }
  101. #endregion
  102. #region ObserveOn
  103. /// <summary>
  104. /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
  105. /// </summary>
  106. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  107. /// <param name="source">Source sequence.</param>
  108. /// <param name="scheduler">Scheduler to notify observers on.</param>
  109. /// <returns>The source sequence whose observations happen on the specified scheduler.</returns>
  110. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  111. public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
  112. {
  113. if (source == null)
  114. throw new ArgumentNullException(nameof(source));
  115. if (scheduler == null)
  116. throw new ArgumentNullException(nameof(scheduler));
  117. return new ObserveOn<TSource>.Scheduler(source, scheduler);
  118. }
  119. /// <summary>
  120. /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
  121. /// </summary>
  122. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  123. /// <param name="source">Source sequence.</param>
  124. /// <param name="context">Synchronization context to notify observers on.</param>
  125. /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>
  126. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is <c>null</c>.</exception>
  127. public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
  128. {
  129. if (source == null)
  130. throw new ArgumentNullException(nameof(source));
  131. if (context == null)
  132. throw new ArgumentNullException(nameof(context));
  133. return new ObserveOn<TSource>.Context(source, context);
  134. }
  135. #endregion
  136. #region Synchronize
  137. /// <summary>
  138. /// Wraps the source sequence in order to ensure observer callbacks are properly serialized.
  139. /// </summary>
  140. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  141. /// <param name="source">Source sequence.</param>
  142. /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>
  143. /// <exception cref="ArgumentNullException"><paramref name="source"/> is <c>null</c>.</exception>
  144. public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source)
  145. {
  146. if (source == null)
  147. throw new ArgumentNullException(nameof(source));
  148. return new Synchronize<TSource>(source);
  149. }
  150. /// <summary>
  151. /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
  152. /// </summary>
  153. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  154. /// <param name="source">Source sequence.</param>
  155. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  156. /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
  157. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is <c>null</c>.</exception>
  158. public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate)
  159. {
  160. if (source == null)
  161. throw new ArgumentNullException(nameof(source));
  162. if (gate == null)
  163. throw new ArgumentNullException(nameof(gate));
  164. return new Synchronize<TSource>(source, gate);
  165. }
  166. #endregion
  167. }
  168. }