Observable.Concurrency.cs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Threading;
  6. namespace System.Reactive.Linq
  7. {
  8. public static partial class Observable
  9. {
  10. #region + ObserveOn +
  11. /// <summary>
  12. /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
  13. /// </summary>
  14. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  15. /// <param name="source">Source sequence.</param>
  16. /// <param name="scheduler">Scheduler to notify observers on.</param>
  17. /// <returns>The source sequence whose observations happen on the specified scheduler.</returns>
  18. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  19. /// <remarks>
  20. /// This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects
  21. /// that require to be run on a scheduler, use <see cref="Observable.SubscribeOn{TSource}(IObservable{TSource}, IScheduler)"/>.
  22. /// </remarks>
  23. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)
  24. {
  25. if (source == null)
  26. throw new ArgumentNullException(nameof(source));
  27. if (scheduler == null)
  28. throw new ArgumentNullException(nameof(scheduler));
  29. return s_impl.ObserveOn<TSource>(source, scheduler);
  30. }
  31. #if !NO_SYNCCTX
  32. /// <summary>
  33. /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
  34. /// </summary>
  35. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  36. /// <param name="source">Source sequence.</param>
  37. /// <param name="context">Synchronization context to notify observers on.</param>
  38. /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>
  39. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
  40. /// <remarks>
  41. /// This only invokes observer callbacks on a synchronization context. In case the subscription and/or unsubscription actions have side-effects
  42. /// that require to be run on a synchronization context, use <see cref="Observable.SubscribeOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
  43. /// </remarks>
  44. public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, SynchronizationContext context)
  45. {
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. if (context == null)
  49. throw new ArgumentNullException(nameof(context));
  50. return s_impl.ObserveOn<TSource>(source, context);
  51. }
  52. #endif
  53. #endregion
  54. #region + SubscribeOn +
  55. /// <summary>
  56. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used;
  57. /// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.
  58. /// </summary>
  59. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  60. /// <param name="source">Source sequence.</param>
  61. /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>
  62. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>
  63. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  64. /// <remarks>
  65. /// This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer
  66. /// callbacks on a scheduler, use <see cref="Observable.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.
  67. /// </remarks>
  68. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)
  69. {
  70. if (source == null)
  71. throw new ArgumentNullException(nameof(source));
  72. if (scheduler == null)
  73. throw new ArgumentNullException(nameof(scheduler));
  74. return s_impl.SubscribeOn<TSource>(source, scheduler);
  75. }
  76. #if !NO_SYNCCTX
  77. /// <summary>
  78. /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. This operation is not commonly used;
  79. /// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.
  80. /// </summary>
  81. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  82. /// <param name="source">Source sequence.</param>
  83. /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>
  84. /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>
  85. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
  86. /// <remarks>
  87. /// This only performs the side-effects of subscription and unsubscription on the specified synchronization context. In order to invoke observer
  88. /// callbacks on a synchronization context, use <see cref="Observable.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
  89. /// </remarks>
  90. public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, SynchronizationContext context)
  91. {
  92. if (source == null)
  93. throw new ArgumentNullException(nameof(source));
  94. if (context == null)
  95. throw new ArgumentNullException(nameof(context));
  96. return s_impl.SubscribeOn<TSource>(source, context);
  97. }
  98. #endif
  99. #endregion
  100. #region + Synchronize +
  101. /// <summary>
  102. /// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently.
  103. /// This overload is useful to "fix" an observable sequence that exhibits concurrent callbacks on individual observers, which is invalid behavior for the query processor.
  104. /// </summary>
  105. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  106. /// <param name="source">Source sequence.</param>
  107. /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>
  108. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  109. /// <remarks>
  110. /// It's invalid behavior - according to the observer grammar - for a sequence to exhibit concurrent callbacks on a given observer.
  111. /// This operator can be used to "fix" a source that doesn't conform to this rule.
  112. /// </remarks>
  113. public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source)
  114. {
  115. if (source == null)
  116. throw new ArgumentNullException(nameof(source));
  117. return s_impl.Synchronize<TSource>(source);
  118. }
  119. /// <summary>
  120. /// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently, using the specified gate object.
  121. /// This overload is useful when writing n-ary query operators, in order to prevent concurrent callbacks from different sources by synchronizing on a common gate object.
  122. /// </summary>
  123. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  124. /// <param name="source">Source sequence.</param>
  125. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  126. /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
  127. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>
  128. public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source, object gate)
  129. {
  130. if (source == null)
  131. throw new ArgumentNullException(nameof(source));
  132. if (gate == null)
  133. throw new ArgumentNullException(nameof(gate));
  134. return s_impl.Synchronize<TSource>(source, gate);
  135. }
  136. #endregion
  137. }
  138. }