Observer.Extensions.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Concurrency;
  3. using System.Threading;
  4. namespace System.Reactive
  5. {
  6. /// <summary>
  7. /// Provides a set of static methods for creating observers.
  8. /// </summary>
  9. public static class Observer
  10. {
  11. /// <summary>
  12. /// Creates an observer from a notification callback.
  13. /// </summary>
  14. /// <param name="handler">Action that handles a notification.</param>
  15. /// <returns>The observer object that invokes the specified handler using a notification corresponding to each message it receives.</returns>
  16. public static IObserver<T> ToObserver<T>(this Action<Notification<T>> handler)
  17. {
  18. if (handler == null)
  19. throw new ArgumentNullException("handler");
  20. return new AnonymousObserver<T>(
  21. x => handler(Notification.CreateOnNext<T>(x)),
  22. exception => handler(Notification.CreateOnError<T>(exception)),
  23. () => handler(Notification.CreateOnCompleted<T>()));
  24. }
  25. /// <summary>
  26. /// Creates a notification callback from an observer.
  27. /// </summary>
  28. /// <param name="observer">Observer object.</param>
  29. /// <returns>The action that forwards its input notification to the underlying observer.</returns>
  30. public static Action<Notification<T>> ToNotifier<T>(this IObserver<T> observer)
  31. {
  32. if (observer == null)
  33. throw new ArgumentNullException("observer");
  34. return n => n.Accept(observer);
  35. }
  36. /// <summary>
  37. /// Creates an observer from the specified OnNext action.
  38. /// </summary>
  39. /// <param name="onNext">Observer's OnNext action implementation.</param>
  40. /// <returns>The observer object implemented using the given actions.</returns>
  41. public static IObserver<T> Create<T>(Action<T> onNext)
  42. {
  43. if (onNext == null)
  44. throw new ArgumentNullException("onNext");
  45. return new AnonymousObserver<T>(onNext);
  46. }
  47. /// <summary>
  48. /// Creates an observer from the specified OnNext and OnError actions.
  49. /// </summary>
  50. /// <param name="onNext">Observer's OnNext action implementation.</param>
  51. /// <param name="onError">Observer's OnError action implementation.</param>
  52. /// <returns>The observer object implemented using the given actions.</returns>
  53. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError)
  54. {
  55. if (onNext == null)
  56. throw new ArgumentNullException("onNext");
  57. if (onError == null)
  58. throw new ArgumentNullException("onError");
  59. return new AnonymousObserver<T>(onNext, onError);
  60. }
  61. /// <summary>
  62. /// Creates an observer from the specified OnNext and OnCompleted actions.
  63. /// </summary>
  64. /// <param name="onNext">Observer's OnNext action implementation.</param>
  65. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  66. /// <returns>The observer object implemented using the given actions.</returns>
  67. public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted)
  68. {
  69. if (onNext == null)
  70. throw new ArgumentNullException("onNext");
  71. if (onCompleted == null)
  72. throw new ArgumentNullException("onCompleted");
  73. return new AnonymousObserver<T>(onNext, onCompleted);
  74. }
  75. /// <summary>
  76. /// Creates an observer from the specified OnNext, OnError, and OnCompleted actions.
  77. /// </summary>
  78. /// <param name="onNext">Observer's OnNext action implementation.</param>
  79. /// <param name="onError">Observer's OnError action implementation.</param>
  80. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  81. /// <returns>The observer object implemented using the given actions.</returns>
  82. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  83. {
  84. if (onNext == null)
  85. throw new ArgumentNullException("onNext");
  86. if (onError == null)
  87. throw new ArgumentNullException("onError");
  88. if (onCompleted == null)
  89. throw new ArgumentNullException("onCompleted");
  90. return new AnonymousObserver<T>(onNext, onError, onCompleted);
  91. }
  92. /// <summary>
  93. /// Hides the identity of an observer.
  94. /// </summary>
  95. /// <param name="observer">An observer whose identity to hide.</param>
  96. /// <returns>An observer that hides the identity of the specified observer.</returns>
  97. public static IObserver<T> AsObserver<T>(this IObserver<T> observer)
  98. {
  99. if (observer == null)
  100. throw new ArgumentNullException("observer");
  101. return new AnonymousObserver<T>(observer.OnNext, observer.OnError, observer.OnCompleted);
  102. }
  103. /// <summary>
  104. /// Synchronizes the observer messages.
  105. /// </summary>
  106. /// <param name="observer">The observer to synchronize.</param>
  107. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  108. /// <returns>The observer whose messages are synchronized on the given gate object.</returns>
  109. public static IObserver<T> Synchronize<T>(IObserver<T> observer, object gate)
  110. {
  111. if (observer == null)
  112. throw new ArgumentNullException("observer");
  113. if (gate == null)
  114. throw new ArgumentNullException("gate");
  115. return new SynchronizedObserver<T>(observer, gate);
  116. }
  117. /// <summary>
  118. /// Synchronizes the observer messages.
  119. /// </summary>
  120. /// <param name="observer">The observer to synchronize.</param>
  121. /// <returns>The observer whose messages are synchronized.</returns>
  122. public static IObserver<T> Synchronize<T>(IObserver<T> observer)
  123. {
  124. if (observer == null)
  125. throw new ArgumentNullException("observer");
  126. return Synchronize(observer, new object());
  127. }
  128. /// <summary>
  129. /// Schedules the observer messages on the given scheduler.
  130. /// </summary>
  131. /// <param name="observer">The observer to schedule messages for.</param>
  132. /// <param name="scheduler">Scheduler to schedule observer messages on.</param>
  133. /// <returns>Observer whose messages are scheduled on the given scheduler.</returns>
  134. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, IScheduler scheduler)
  135. {
  136. if (observer == null)
  137. throw new ArgumentNullException("observer");
  138. if (scheduler == null)
  139. throw new ArgumentNullException("scheduler");
  140. return new ObserveOnObserver<T>(scheduler, observer, null);
  141. }
  142. #if !NO_SYNCCTX
  143. /// <summary>
  144. /// Schedules the observer messages on the given synchonization context.
  145. /// </summary>
  146. /// <param name="observer">The observer to schedule messages for.</param>
  147. /// <param name="context">Synchonization context to schedule observer messages on.</param>
  148. /// <returns>Observer whose messages are scheduled on the given synchonization context.</returns>
  149. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, SynchronizationContext context)
  150. {
  151. if (observer == null)
  152. throw new ArgumentNullException("observer");
  153. if (context == null)
  154. throw new ArgumentNullException("context");
  155. return new ObserveOnObserver<T>(new SynchronizationContextScheduler(context), observer, null);
  156. }
  157. #endif
  158. #if HAS_PROGRESS
  159. /// <summary>
  160. /// Converts an observer to a progress object.
  161. /// </summary>
  162. /// <param name="observer">The observer to convert.</param>
  163. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  164. public static IProgress<T> ToProgress<T>(this IObserver<T> observer)
  165. {
  166. if (observer == null)
  167. throw new ArgumentNullException("observer");
  168. return new AnonymousProgress<T>(observer.OnNext);
  169. }
  170. /// <summary>
  171. /// Converts an observer to a progress object.
  172. /// </summary>
  173. /// <param name="observer">The observer to convert.</param>
  174. /// <param name="scheduler">Scheduler to report progress on.</param>
  175. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  176. public static IProgress<T> ToProgress<T>(this IObserver<T> observer, IScheduler scheduler)
  177. {
  178. if (observer == null)
  179. throw new ArgumentNullException("observer");
  180. if (scheduler == null)
  181. throw new ArgumentNullException("scheduler");
  182. return new AnonymousProgress<T>(new ObserveOnObserver<T>(scheduler, observer, null).OnNext);
  183. }
  184. class AnonymousProgress<T> : IProgress<T>
  185. {
  186. private readonly Action<T> _progress;
  187. public AnonymousProgress(Action<T> progress)
  188. {
  189. _progress = progress;
  190. }
  191. public void Report(T value)
  192. {
  193. _progress(value);
  194. }
  195. }
  196. /// <summary>
  197. /// Converts a progress object to an observer.
  198. /// </summary>
  199. /// <param name="progress">The progress object to convert.</param>
  200. /// <returns>Observer whose OnNext messages correspond to the progress object's Report messages.</returns>
  201. public static IObserver<T> ToObserver<T>(this IProgress<T> progress)
  202. {
  203. if (progress == null)
  204. throw new ArgumentNullException("progress");
  205. return Create<T>(progress.Report);
  206. }
  207. #endif
  208. }
  209. }