Observer.Extensions.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Threading;
  6. namespace System.Reactive
  7. {
  8. /// <summary>
  9. /// Provides a set of static methods for creating observers.
  10. /// </summary>
  11. public static class Observer
  12. {
  13. /// <summary>
  14. /// Creates an observer from a notification callback.
  15. /// </summary>
  16. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  17. /// <param name="handler">Action that handles a notification.</param>
  18. /// <returns>The observer object that invokes the specified handler using a notification corresponding to each message it receives.</returns>
  19. /// <exception cref="ArgumentNullException"><paramref name="handler"/> is null.</exception>
  20. public static IObserver<T> ToObserver<T>(this Action<Notification<T>> handler)
  21. {
  22. if (handler == null)
  23. throw new ArgumentNullException(nameof(handler));
  24. return new AnonymousObserver<T>(
  25. x => handler(Notification.CreateOnNext<T>(x)),
  26. exception => handler(Notification.CreateOnError<T>(exception)),
  27. () => handler(Notification.CreateOnCompleted<T>())
  28. );
  29. }
  30. /// <summary>
  31. /// Creates a notification callback from an observer.
  32. /// </summary>
  33. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  34. /// <param name="observer">Observer object.</param>
  35. /// <returns>The action that forwards its input notification to the underlying observer.</returns>
  36. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  37. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1704:IdentifiersShouldBeSpelledCorrectly", MessageId = "Notifier", Justification = "Backward compat.")]
  38. public static Action<Notification<T>> ToNotifier<T>(this IObserver<T> observer)
  39. {
  40. if (observer == null)
  41. throw new ArgumentNullException(nameof(observer));
  42. return n => n.Accept(observer);
  43. }
  44. /// <summary>
  45. /// Creates an observer from the specified OnNext action.
  46. /// </summary>
  47. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  48. /// <param name="onNext">Observer's OnNext action implementation.</param>
  49. /// <returns>The observer object implemented using the given actions.</returns>
  50. /// <exception cref="ArgumentNullException"><paramref name="onNext"/> is null.</exception>
  51. public static IObserver<T> Create<T>(Action<T> onNext)
  52. {
  53. if (onNext == null)
  54. throw new ArgumentNullException(nameof(onNext));
  55. return new AnonymousObserver<T>(onNext);
  56. }
  57. /// <summary>
  58. /// Creates an observer from the specified OnNext and OnError actions.
  59. /// </summary>
  60. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  61. /// <param name="onNext">Observer's OnNext action implementation.</param>
  62. /// <param name="onError">Observer's OnError action implementation.</param>
  63. /// <returns>The observer object implemented using the given actions.</returns>
  64. /// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
  65. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError)
  66. {
  67. if (onNext == null)
  68. throw new ArgumentNullException(nameof(onNext));
  69. if (onError == null)
  70. throw new ArgumentNullException(nameof(onError));
  71. return new AnonymousObserver<T>(onNext, onError);
  72. }
  73. /// <summary>
  74. /// Creates an observer from the specified OnNext and OnCompleted actions.
  75. /// </summary>
  76. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  77. /// <param name="onNext">Observer's OnNext action implementation.</param>
  78. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  79. /// <returns>The observer object implemented using the given actions.</returns>
  80. /// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
  81. public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted)
  82. {
  83. if (onNext == null)
  84. throw new ArgumentNullException(nameof(onNext));
  85. if (onCompleted == null)
  86. throw new ArgumentNullException(nameof(onCompleted));
  87. return new AnonymousObserver<T>(onNext, onCompleted);
  88. }
  89. /// <summary>
  90. /// Creates an observer from the specified OnNext, OnError, and OnCompleted actions.
  91. /// </summary>
  92. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  93. /// <param name="onNext">Observer's OnNext action implementation.</param>
  94. /// <param name="onError">Observer's OnError action implementation.</param>
  95. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  96. /// <returns>The observer object implemented using the given actions.</returns>
  97. /// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  98. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  99. {
  100. if (onNext == null)
  101. throw new ArgumentNullException(nameof(onNext));
  102. if (onError == null)
  103. throw new ArgumentNullException(nameof(onError));
  104. if (onCompleted == null)
  105. throw new ArgumentNullException(nameof(onCompleted));
  106. return new AnonymousObserver<T>(onNext, onError, onCompleted);
  107. }
  108. /// <summary>
  109. /// Hides the identity of an observer.
  110. /// </summary>
  111. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  112. /// <param name="observer">An observer whose identity to hide.</param>
  113. /// <returns>An observer that hides the identity of the specified observer.</returns>
  114. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  115. public static IObserver<T> AsObserver<T>(this IObserver<T> observer)
  116. {
  117. if (observer == null)
  118. throw new ArgumentNullException(nameof(observer));
  119. return new AnonymousObserver<T>(observer.OnNext, observer.OnError, observer.OnCompleted);
  120. }
  121. /// <summary>
  122. /// Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods.
  123. /// If a violation is detected, an InvalidOperationException is thrown from the offending observer method call.
  124. /// </summary>
  125. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  126. /// <param name="observer">The observer whose callback invocations should be checked for grammar violations.</param>
  127. /// <returns>An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.</returns>
  128. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  129. public static IObserver<T> Checked<T>(this IObserver<T> observer)
  130. {
  131. if (observer == null)
  132. throw new ArgumentNullException(nameof(observer));
  133. return new CheckedObserver<T>(observer);
  134. }
  135. /// <summary>
  136. /// Synchronizes access to the observer such that its callback methods cannot be called concurrently from multiple threads. This overload is useful when coordinating access to an observer.
  137. /// Notice reentrant observer callbacks on the same thread are still possible.
  138. /// </summary>
  139. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  140. /// <param name="observer">The observer whose callbacks should be synchronized.</param>
  141. /// <returns>An observer that delivers callbacks to the specified observer in a synchronized manner.</returns>
  142. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  143. /// <remarks>
  144. /// Because a <see cref="System.Threading.Monitor">Monitor</see> is used to perform the synchronization, there's no protection against reentrancy from the same thread.
  145. /// Hence, overlapped observer callbacks are still possible, which is invalid behavior according to the observer grammar. In order to protect against this behavior as
  146. /// well, use the <see cref="Synchronize{T}(IObserver{T}, bool)"/> overload, passing true for the second parameter.
  147. /// </remarks>
  148. public static IObserver<T> Synchronize<T>(IObserver<T> observer)
  149. {
  150. if (observer == null)
  151. throw new ArgumentNullException(nameof(observer));
  152. return new SynchronizedObserver<T>(observer, new object());
  153. }
  154. /// <summary>
  155. /// Synchronizes access to the observer such that its callback methods cannot be called concurrently. This overload is useful when coordinating access to an observer.
  156. /// The <paramref name="preventReentrancy"/> parameter configures the type of lock used for synchronization.
  157. /// </summary>
  158. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  159. /// <param name="observer">The observer whose callbacks should be synchronized.</param>
  160. /// <param name="preventReentrancy">If set to true, reentrant observer callbacks will be queued up and get delivered to the observer in a sequential manner.</param>
  161. /// <returns>An observer that delivers callbacks to the specified observer in a synchronized manner.</returns>
  162. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  163. /// <remarks>
  164. /// When the <paramref name="preventReentrancy"/> parameter is set to false, behavior is identical to the <see cref="Synchronize{T}(IObserver{T})"/> overload which uses
  165. /// a <see cref="System.Threading.Monitor">Monitor</see> for synchronization. When the <paramref name="preventReentrancy"/> parameter is set to true, an <see cref="AsyncLock"/>
  166. /// is used to queue up callbacks to the specified observer if a reentrant call is made.
  167. /// </remarks>
  168. public static IObserver<T> Synchronize<T>(IObserver<T> observer, bool preventReentrancy)
  169. {
  170. if (observer == null)
  171. throw new ArgumentNullException(nameof(observer));
  172. if (preventReentrancy)
  173. return new AsyncLockObserver<T>(observer, new AsyncLock());
  174. else
  175. return new SynchronizedObserver<T>(observer, new object());
  176. }
  177. /// <summary>
  178. /// Synchronizes access to the observer such that its callback methods cannot be called concurrently by multiple threads, using the specified gate object for use by a <see cref="System.Threading.Monitor">Monitor</see>-based lock.
  179. /// This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common gate object.
  180. /// Notice reentrant observer callbacks on the same thread are still possible.
  181. /// </summary>
  182. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  183. /// <param name="observer">The observer whose callbacks should be synchronized.</param>
  184. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  185. /// <returns>An observer that delivers callbacks to the specified observer in a synchronized manner.</returns>
  186. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="gate"/> is null.</exception>
  187. /// <remarks>
  188. /// Because a <see cref="System.Threading.Monitor">Monitor</see> is used to perform the synchronization, there's no protection against reentrancy from the same thread.
  189. /// Hence, overlapped observer callbacks are still possible, which is invalid behavior according to the observer grammar. In order to protect against this behavior as
  190. /// well, use the <see cref="Synchronize{T}(IObserver{T}, AsyncLock)"/> overload.
  191. /// </remarks>
  192. public static IObserver<T> Synchronize<T>(IObserver<T> observer, object gate)
  193. {
  194. if (observer == null)
  195. throw new ArgumentNullException(nameof(observer));
  196. if (gate == null)
  197. throw new ArgumentNullException(nameof(gate));
  198. return new SynchronizedObserver<T>(observer, gate);
  199. }
  200. /// <summary>
  201. /// Synchronizes access to the observer such that its callback methods cannot be called concurrently, using the specified asynchronous lock to protect against concurrent and reentrant access.
  202. /// This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common asynchronous lock.
  203. /// </summary>
  204. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  205. /// <param name="observer">The observer whose callbacks should be synchronized.</param>
  206. /// <param name="asyncLock">Gate object to synchronize each observer call on.</param>
  207. /// <returns>An observer that delivers callbacks to the specified observer in a synchronized manner.</returns>
  208. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="asyncLock"/> is null.</exception>
  209. public static IObserver<T> Synchronize<T>(IObserver<T> observer, AsyncLock asyncLock)
  210. {
  211. if (observer == null)
  212. throw new ArgumentNullException(nameof(observer));
  213. if (asyncLock == null)
  214. throw new ArgumentNullException(nameof(asyncLock));
  215. return new AsyncLockObserver<T>(observer, asyncLock);
  216. }
  217. /// <summary>
  218. /// Schedules the invocation of observer methods on the given scheduler.
  219. /// </summary>
  220. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  221. /// <param name="observer">The observer to schedule messages for.</param>
  222. /// <param name="scheduler">Scheduler to schedule observer messages on.</param>
  223. /// <returns>Observer whose messages are scheduled on the given scheduler.</returns>
  224. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="scheduler"/> is null.</exception>
  225. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, IScheduler scheduler)
  226. {
  227. if (observer == null)
  228. throw new ArgumentNullException(nameof(observer));
  229. if (scheduler == null)
  230. throw new ArgumentNullException(nameof(scheduler));
  231. return new ObserveOnObserver<T>(scheduler, observer, null);
  232. }
  233. #if !NO_SYNCCTX
  234. /// <summary>
  235. /// Schedules the invocation of observer methods on the given synchonization context.
  236. /// </summary>
  237. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  238. /// <param name="observer">The observer to schedule messages for.</param>
  239. /// <param name="context">Synchonization context to schedule observer messages on.</param>
  240. /// <returns>Observer whose messages are scheduled on the given synchonization context.</returns>
  241. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="context"/> is null.</exception>
  242. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, SynchronizationContext context)
  243. {
  244. if (observer == null)
  245. throw new ArgumentNullException(nameof(observer));
  246. if (context == null)
  247. throw new ArgumentNullException(nameof(context));
  248. return new ObserveOnObserver<T>(new SynchronizationContextScheduler(context), observer, null);
  249. }
  250. #endif
  251. #if HAS_PROGRESS
  252. /// <summary>
  253. /// Converts an observer to a progress object.
  254. /// </summary>
  255. /// <typeparam name="T">The type of the progress objects received by the source observer.</typeparam>
  256. /// <param name="observer">The observer to convert.</param>
  257. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  258. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  259. public static IProgress<T> ToProgress<T>(this IObserver<T> observer)
  260. {
  261. if (observer == null)
  262. throw new ArgumentNullException(nameof(observer));
  263. return new AnonymousProgress<T>(observer.OnNext);
  264. }
  265. /// <summary>
  266. /// Converts an observer to a progress object, using the specified scheduler to invoke the progress reporting method.
  267. /// </summary>
  268. /// <typeparam name="T">The type of the progress objects received by the source observer.</typeparam>
  269. /// <param name="observer">The observer to convert.</param>
  270. /// <param name="scheduler">Scheduler to report progress on.</param>
  271. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  272. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="scheduler"/> is null.</exception>
  273. public static IProgress<T> ToProgress<T>(this IObserver<T> observer, IScheduler scheduler)
  274. {
  275. if (observer == null)
  276. throw new ArgumentNullException(nameof(observer));
  277. if (scheduler == null)
  278. throw new ArgumentNullException(nameof(scheduler));
  279. return new AnonymousProgress<T>(new ObserveOnObserver<T>(scheduler, observer, null).OnNext);
  280. }
  281. class AnonymousProgress<T> : IProgress<T>
  282. {
  283. private readonly Action<T> _progress;
  284. public AnonymousProgress(Action<T> progress)
  285. {
  286. _progress = progress;
  287. }
  288. public void Report(T value)
  289. {
  290. _progress(value);
  291. }
  292. }
  293. /// <summary>
  294. /// Converts a progress object to an observer.
  295. /// </summary>
  296. /// <typeparam name="T">The type of the progress objects received by the progress reporter.</typeparam>
  297. /// <param name="progress">The progress object to convert.</param>
  298. /// <returns>Observer whose OnNext messages correspond to the progress object's Report messages.</returns>
  299. /// <exception cref="ArgumentNullException"><paramref name="progress"/> is null.</exception>
  300. public static IObserver<T> ToObserver<T>(this IProgress<T> progress)
  301. {
  302. if (progress == null)
  303. throw new ArgumentNullException(nameof(progress));
  304. return new AnonymousObserver<T>(progress.Report);
  305. }
  306. #endif
  307. }
  308. }