1
0

Observer.Extensions.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Concurrency;
  3. using System.Threading;
  4. namespace System.Reactive
  5. {
  6. /// <summary>
  7. /// Provides a set of static methods for creating observers.
  8. /// </summary>
  9. public static class Observer
  10. {
  11. /// <summary>
  12. /// Creates an observer from a notification callback.
  13. /// </summary>
  14. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  15. /// <param name="handler">Action that handles a notification.</param>
  16. /// <returns>The observer object that invokes the specified handler using a notification corresponding to each message it receives.</returns>
  17. /// <exception cref="ArgumentNullException"><paramref name="handler"/> is null.</exception>
  18. public static IObserver<T> ToObserver<T>(this Action<Notification<T>> handler)
  19. {
  20. if (handler == null)
  21. throw new ArgumentNullException("handler");
  22. return new AnonymousObserver<T>(
  23. x => handler(Notification.CreateOnNext<T>(x)),
  24. exception => handler(Notification.CreateOnError<T>(exception)),
  25. () => handler(Notification.CreateOnCompleted<T>())
  26. );
  27. }
  28. /// <summary>
  29. /// Creates a notification callback from an observer.
  30. /// </summary>
  31. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  32. /// <param name="observer">Observer object.</param>
  33. /// <returns>The action that forwards its input notification to the underlying observer.</returns>
  34. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  35. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1704:IdentifiersShouldBeSpelledCorrectly", MessageId = "Notifier", Justification = "Backward compat.")]
  36. public static Action<Notification<T>> ToNotifier<T>(this IObserver<T> observer)
  37. {
  38. if (observer == null)
  39. throw new ArgumentNullException("observer");
  40. return n => n.Accept(observer);
  41. }
  42. /// <summary>
  43. /// Creates an observer from the specified OnNext action.
  44. /// </summary>
  45. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  46. /// <param name="onNext">Observer's OnNext action implementation.</param>
  47. /// <returns>The observer object implemented using the given actions.</returns>
  48. /// <exception cref="ArgumentNullException"><paramref name="onNext"/> is null.</exception>
  49. public static IObserver<T> Create<T>(Action<T> onNext)
  50. {
  51. if (onNext == null)
  52. throw new ArgumentNullException("onNext");
  53. return new AnonymousObserver<T>(onNext);
  54. }
  55. /// <summary>
  56. /// Creates an observer from the specified OnNext and OnError actions.
  57. /// </summary>
  58. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  59. /// <param name="onNext">Observer's OnNext action implementation.</param>
  60. /// <param name="onError">Observer's OnError action implementation.</param>
  61. /// <returns>The observer object implemented using the given actions.</returns>
  62. /// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
  63. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError)
  64. {
  65. if (onNext == null)
  66. throw new ArgumentNullException("onNext");
  67. if (onError == null)
  68. throw new ArgumentNullException("onError");
  69. return new AnonymousObserver<T>(onNext, onError);
  70. }
  71. /// <summary>
  72. /// Creates an observer from the specified OnNext and OnCompleted actions.
  73. /// </summary>
  74. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  75. /// <param name="onNext">Observer's OnNext action implementation.</param>
  76. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  77. /// <returns>The observer object implemented using the given actions.</returns>
  78. /// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
  79. public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted)
  80. {
  81. if (onNext == null)
  82. throw new ArgumentNullException("onNext");
  83. if (onCompleted == null)
  84. throw new ArgumentNullException("onCompleted");
  85. return new AnonymousObserver<T>(onNext, onCompleted);
  86. }
  87. /// <summary>
  88. /// Creates an observer from the specified OnNext, OnError, and OnCompleted actions.
  89. /// </summary>
  90. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  91. /// <param name="onNext">Observer's OnNext action implementation.</param>
  92. /// <param name="onError">Observer's OnError action implementation.</param>
  93. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  94. /// <returns>The observer object implemented using the given actions.</returns>
  95. /// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  96. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  97. {
  98. if (onNext == null)
  99. throw new ArgumentNullException("onNext");
  100. if (onError == null)
  101. throw new ArgumentNullException("onError");
  102. if (onCompleted == null)
  103. throw new ArgumentNullException("onCompleted");
  104. return new AnonymousObserver<T>(onNext, onError, onCompleted);
  105. }
  106. /// <summary>
  107. /// Hides the identity of an observer.
  108. /// </summary>
  109. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  110. /// <param name="observer">An observer whose identity to hide.</param>
  111. /// <returns>An observer that hides the identity of the specified observer.</returns>
  112. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  113. public static IObserver<T> AsObserver<T>(this IObserver<T> observer)
  114. {
  115. if (observer == null)
  116. throw new ArgumentNullException("observer");
  117. return new AnonymousObserver<T>(observer.OnNext, observer.OnError, observer.OnCompleted);
  118. }
  119. /// <summary>
  120. /// Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods.
  121. /// If a violation is detected, an InvalidOperationException is thrown from the offending observer method call.
  122. /// </summary>
  123. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  124. /// <param name="observer">The observer whose callback invocations should be checked for grammar violations.</param>
  125. /// <returns>An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.</returns>
  126. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  127. public static IObserver<T> Checked<T>(this IObserver<T> observer)
  128. {
  129. if (observer == null)
  130. throw new ArgumentNullException("observer");
  131. return new CheckedObserver<T>(observer);
  132. }
  133. /// <summary>
  134. /// Synchronizes access to the observer such that its callback methods cannot be called concurrently from multiple threads. This overload is useful when coordinating access to an observer.
  135. /// Notice reentrant observer callbacks on the same thread are still possible.
  136. /// </summary>
  137. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  138. /// <param name="observer">The observer whose callbacks should be synchronized.</param>
  139. /// <returns>An observer that delivers callbacks to the specified observer in a synchronized manner.</returns>
  140. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  141. /// <remarks>
  142. /// Because a <see cref="System.Threading.Monitor">Monitor</see> is used to perform the synchronization, there's no protection against reentrancy from the same thread.
  143. /// Hence, overlapped observer callbacks are still possible, which is invalid behavior according to the observer grammar. In order to protect against this behavior as
  144. /// well, use the <see cref="Synchronize{T}(IObserver{T}, bool)"/> overload, passing true for the second parameter.
  145. /// </remarks>
  146. public static IObserver<T> Synchronize<T>(IObserver<T> observer)
  147. {
  148. if (observer == null)
  149. throw new ArgumentNullException("observer");
  150. return new SynchronizedObserver<T>(observer, new object());
  151. }
  152. /// <summary>
  153. /// Synchronizes access to the observer such that its callback methods cannot be called concurrently. This overload is useful when coordinating access to an observer.
  154. /// The <paramref name="preventReentrancy"/> parameter configures the type of lock used for synchronization.
  155. /// </summary>
  156. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  157. /// <param name="observer">The observer whose callbacks should be synchronized.</param>
  158. /// <param name="preventReentrancy">If set to true, reentrant observer callbacks will be queued up and get delivered to the observer in a sequential manner.</param>
  159. /// <returns>An observer that delivers callbacks to the specified observer in a synchronized manner.</returns>
  160. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  161. /// <remarks>
  162. /// When the <paramref name="preventReentrancy"/> parameter is set to false, behavior is identical to the <see cref="Synchronize{T}(IObserver{T})"/> overload which uses
  163. /// a <see cref="System.Threading.Monitor">Monitor</see> for synchronization. When the <paramref name="preventReentrancy"/> parameter is set to true, an <see cref="AsyncLock"/>
  164. /// is used to queue up callbacks to the specified observer if a reentrant call is made.
  165. /// </remarks>
  166. public static IObserver<T> Synchronize<T>(IObserver<T> observer, bool preventReentrancy)
  167. {
  168. if (observer == null)
  169. throw new ArgumentNullException("observer");
  170. if (preventReentrancy)
  171. return new AsyncLockObserver<T>(observer, new AsyncLock());
  172. else
  173. return new SynchronizedObserver<T>(observer, new object());
  174. }
  175. /// <summary>
  176. /// Synchronizes access to the observer such that its callback methods cannot be called concurrently by multiple threads, using the specified gate object for use by a <see cref="System.Threading.Monitor">Monitor</see>-based lock.
  177. /// This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common gate object.
  178. /// Notice reentrant observer callbacks on the same thread are still possible.
  179. /// </summary>
  180. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  181. /// <param name="observer">The observer whose callbacks should be synchronized.</param>
  182. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  183. /// <returns>An observer that delivers callbacks to the specified observer in a synchronized manner.</returns>
  184. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="gate"/> is null.</exception>
  185. /// <remarks>
  186. /// Because a <see cref="System.Threading.Monitor">Monitor</see> is used to perform the synchronization, there's no protection against reentrancy from the same thread.
  187. /// Hence, overlapped observer callbacks are still possible, which is invalid behavior according to the observer grammar. In order to protect against this behavior as
  188. /// well, use the <see cref="Synchronize{T}(IObserver{T}, AsyncLock)"/> overload.
  189. /// </remarks>
  190. public static IObserver<T> Synchronize<T>(IObserver<T> observer, object gate)
  191. {
  192. if (observer == null)
  193. throw new ArgumentNullException("observer");
  194. if (gate == null)
  195. throw new ArgumentNullException("gate");
  196. return new SynchronizedObserver<T>(observer, gate);
  197. }
  198. /// <summary>
  199. /// Synchronizes access to the observer such that its callback methods cannot be called concurrently, using the specified asynchronous lock to protect against concurrent and reentrant access.
  200. /// This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common asynchronous lock.
  201. /// </summary>
  202. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  203. /// <param name="observer">The observer whose callbacks should be synchronized.</param>
  204. /// <param name="asyncLock">Gate object to synchronize each observer call on.</param>
  205. /// <returns>An observer that delivers callbacks to the specified observer in a synchronized manner.</returns>
  206. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="asyncLock"/> is null.</exception>
  207. public static IObserver<T> Synchronize<T>(IObserver<T> observer, AsyncLock asyncLock)
  208. {
  209. if (observer == null)
  210. throw new ArgumentNullException("observer");
  211. if (asyncLock == null)
  212. throw new ArgumentNullException("asyncLock");
  213. return new AsyncLockObserver<T>(observer, asyncLock);
  214. }
  215. /// <summary>
  216. /// Schedules the invocation of observer methods on the given scheduler.
  217. /// </summary>
  218. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  219. /// <param name="observer">The observer to schedule messages for.</param>
  220. /// <param name="scheduler">Scheduler to schedule observer messages on.</param>
  221. /// <returns>Observer whose messages are scheduled on the given scheduler.</returns>
  222. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="scheduler"/> is null.</exception>
  223. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, IScheduler scheduler)
  224. {
  225. if (observer == null)
  226. throw new ArgumentNullException("observer");
  227. if (scheduler == null)
  228. throw new ArgumentNullException("scheduler");
  229. return new ObserveOnObserver<T>(scheduler, observer, null);
  230. }
  231. #if !NO_SYNCCTX
  232. /// <summary>
  233. /// Schedules the invocation of observer methods on the given synchonization context.
  234. /// </summary>
  235. /// <typeparam name="T">The type of the elements received by the source observer.</typeparam>
  236. /// <param name="observer">The observer to schedule messages for.</param>
  237. /// <param name="context">Synchonization context to schedule observer messages on.</param>
  238. /// <returns>Observer whose messages are scheduled on the given synchonization context.</returns>
  239. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="context"/> is null.</exception>
  240. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, SynchronizationContext context)
  241. {
  242. if (observer == null)
  243. throw new ArgumentNullException("observer");
  244. if (context == null)
  245. throw new ArgumentNullException("context");
  246. return new ObserveOnObserver<T>(new SynchronizationContextScheduler(context), observer, null);
  247. }
  248. #endif
  249. #if HAS_PROGRESS
  250. /// <summary>
  251. /// Converts an observer to a progress object.
  252. /// </summary>
  253. /// <typeparam name="T">The type of the progress objects received by the source observer.</typeparam>
  254. /// <param name="observer">The observer to convert.</param>
  255. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  256. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  257. public static IProgress<T> ToProgress<T>(this IObserver<T> observer)
  258. {
  259. if (observer == null)
  260. throw new ArgumentNullException("observer");
  261. return new AnonymousProgress<T>(observer.OnNext);
  262. }
  263. /// <summary>
  264. /// Converts an observer to a progress object, using the specified scheduler to invoke the progress reporting method.
  265. /// </summary>
  266. /// <typeparam name="T">The type of the progress objects received by the source observer.</typeparam>
  267. /// <param name="observer">The observer to convert.</param>
  268. /// <param name="scheduler">Scheduler to report progress on.</param>
  269. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  270. /// <exception cref="ArgumentNullException"><paramref name="observer"/> or <paramref name="scheduler"/> is null.</exception>
  271. public static IProgress<T> ToProgress<T>(this IObserver<T> observer, IScheduler scheduler)
  272. {
  273. if (observer == null)
  274. throw new ArgumentNullException("observer");
  275. if (scheduler == null)
  276. throw new ArgumentNullException("scheduler");
  277. return new AnonymousProgress<T>(new ObserveOnObserver<T>(scheduler, observer, null).OnNext);
  278. }
  279. class AnonymousProgress<T> : IProgress<T>
  280. {
  281. private readonly Action<T> _progress;
  282. public AnonymousProgress(Action<T> progress)
  283. {
  284. _progress = progress;
  285. }
  286. public void Report(T value)
  287. {
  288. _progress(value);
  289. }
  290. }
  291. /// <summary>
  292. /// Converts a progress object to an observer.
  293. /// </summary>
  294. /// <typeparam name="T">The type of the progress objects received by the progress reporter.</typeparam>
  295. /// <param name="progress">The progress object to convert.</param>
  296. /// <returns>Observer whose OnNext messages correspond to the progress object's Report messages.</returns>
  297. /// <exception cref="ArgumentNullException"><paramref name="progress"/> is null.</exception>
  298. public static IObserver<T> ToObserver<T>(this IProgress<T> progress)
  299. {
  300. if (progress == null)
  301. throw new ArgumentNullException("progress");
  302. return new AnonymousObserver<T>(progress.Report);
  303. }
  304. #endif
  305. }
  306. }