Observer.Extensions.cs 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. using System.Reactive.Concurrency;
  2. using System.Threading;
  3. namespace System.Reactive
  4. {
  5. /// <summary>
  6. /// Provides a set of static methods for creating observers.
  7. /// </summary>
  8. public static class Observer
  9. {
  10. /// <summary>
  11. /// Creates an observer from a notification callback.
  12. /// </summary>
  13. /// <param name="handler">Action that handles a notification.</param>
  14. /// <returns>The observer object that invokes the specified handler using a notification corresponding to each message it receives.</returns>
  15. public static IObserver<T> ToObserver<T>(this Action<Notification<T>> handler)
  16. {
  17. if (handler == null)
  18. throw new ArgumentNullException("handler");
  19. return new AnonymousObserver<T>(
  20. x => handler(Notification.CreateOnNext<T>(x)),
  21. exception => handler(Notification.CreateOnError<T>(exception)),
  22. () => handler(Notification.CreateOnCompleted<T>()));
  23. }
  24. /// <summary>
  25. /// Creates a notification callback from an observer.
  26. /// </summary>
  27. /// <param name="observer">Observer object.</param>
  28. /// <returns>The action that forwards its input notification to the underlying observer.</returns>
  29. public static Action<Notification<T>> ToNotifier<T>(this IObserver<T> observer)
  30. {
  31. if (observer == null)
  32. throw new ArgumentNullException("observer");
  33. return n => n.Accept(observer);
  34. }
  35. /// <summary>
  36. /// Creates an observer from the specified OnNext action.
  37. /// </summary>
  38. /// <param name="onNext">Observer's OnNext action implementation.</param>
  39. /// <returns>The observer object implemented using the given actions.</returns>
  40. public static IObserver<T> Create<T>(Action<T> onNext)
  41. {
  42. if (onNext == null)
  43. throw new ArgumentNullException("onNext");
  44. return new AnonymousObserver<T>(onNext);
  45. }
  46. /// <summary>
  47. /// Creates an observer from the specified OnNext and OnError actions.
  48. /// </summary>
  49. /// <param name="onNext">Observer's OnNext action implementation.</param>
  50. /// <param name="onError">Observer's OnError action implementation.</param>
  51. /// <returns>The observer object implemented using the given actions.</returns>
  52. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError)
  53. {
  54. if (onNext == null)
  55. throw new ArgumentNullException("onNext");
  56. if (onError == null)
  57. throw new ArgumentNullException("onError");
  58. return new AnonymousObserver<T>(onNext, onError);
  59. }
  60. /// <summary>
  61. /// Creates an observer from the specified OnNext and OnCompleted actions.
  62. /// </summary>
  63. /// <param name="onNext">Observer's OnNext action implementation.</param>
  64. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  65. /// <returns>The observer object implemented using the given actions.</returns>
  66. public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted)
  67. {
  68. if (onNext == null)
  69. throw new ArgumentNullException("onNext");
  70. if (onCompleted == null)
  71. throw new ArgumentNullException("onCompleted");
  72. return new AnonymousObserver<T>(onNext, onCompleted);
  73. }
  74. /// <summary>
  75. /// Creates an observer from the specified OnNext, OnError, and OnCompleted actions.
  76. /// </summary>
  77. /// <param name="onNext">Observer's OnNext action implementation.</param>
  78. /// <param name="onError">Observer's OnError action implementation.</param>
  79. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  80. /// <returns>The observer object implemented using the given actions.</returns>
  81. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  82. {
  83. if (onNext == null)
  84. throw new ArgumentNullException("onNext");
  85. if (onError == null)
  86. throw new ArgumentNullException("onError");
  87. if (onCompleted == null)
  88. throw new ArgumentNullException("onCompleted");
  89. return new AnonymousObserver<T>(onNext, onError, onCompleted);
  90. }
  91. /// <summary>
  92. /// Hides the identity of an observer.
  93. /// </summary>
  94. /// <param name="observer">An observer whose identity to hide.</param>
  95. /// <returns>An observer that hides the identity of the specified observer.</returns>
  96. public static IObserver<T> AsObserver<T>(this IObserver<T> observer)
  97. {
  98. if (observer == null)
  99. throw new ArgumentNullException("observer");
  100. return new AnonymousObserver<T>(observer.OnNext, observer.OnError, observer.OnCompleted);
  101. }
  102. /// <summary>
  103. /// Synchronizes the observer messages.
  104. /// </summary>
  105. /// <param name="observer">The observer to synchronize.</param>
  106. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  107. /// <returns>The observer whose messages are synchronized on the given gate object.</returns>
  108. public static IObserver<T> Synchronize<T>(IObserver<T> observer, object gate)
  109. {
  110. if (observer == null)
  111. throw new ArgumentNullException("observer");
  112. if (gate == null)
  113. throw new ArgumentNullException("gate");
  114. return new SynchronizedObserver<T>(observer, gate);
  115. }
  116. /// <summary>
  117. /// Synchronizes the observer messages.
  118. /// </summary>
  119. /// <param name="observer">The observer to synchronize.</param>
  120. /// <returns>The observer whose messages are synchronized.</returns>
  121. public static IObserver<T> Synchronize<T>(IObserver<T> observer)
  122. {
  123. if (observer == null)
  124. throw new ArgumentNullException("observer");
  125. return Synchronize(observer, new object());
  126. }
  127. /// <summary>
  128. /// Schedules the observer messages on the given scheduler.
  129. /// </summary>
  130. /// <param name="observer">The observer to schedule messages for.</param>
  131. /// <param name="scheduler">Scheduler to schedule observer messages on.</param>
  132. /// <returns>Observer whose messages are scheduled on the given scheduler.</returns>
  133. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, IScheduler scheduler)
  134. {
  135. if (observer == null)
  136. throw new ArgumentNullException("observer");
  137. if (scheduler == null)
  138. throw new ArgumentNullException("scheduler");
  139. return new ObserveOnObserver<T>(scheduler, observer, null);
  140. }
  141. #if !NO_SYNCCTX
  142. /// <summary>
  143. /// Schedules the observer messages on the given synchonization context.
  144. /// </summary>
  145. /// <param name="observer">The observer to schedule messages for.</param>
  146. /// <param name="context">Synchonization context to schedule observer messages on.</param>
  147. /// <returns>Observer whose messages are scheduled on the given synchonization context.</returns>
  148. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, SynchronizationContext context)
  149. {
  150. if (observer == null)
  151. throw new ArgumentNullException("observer");
  152. if (context == null)
  153. throw new ArgumentNullException("context");
  154. return new ObserveOnObserver<T>(new SynchronizationContextScheduler(context), observer, null);
  155. }
  156. #endif
  157. #if HAS_PROGRESS
  158. /// <summary>
  159. /// Converts an observer to a progress object.
  160. /// </summary>
  161. /// <param name="observer">The observer to convert.</param>
  162. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  163. public static IProgress<T> ToProgress<T>(this IObserver<T> observer)
  164. {
  165. if (observer == null)
  166. throw new ArgumentNullException("observer");
  167. return new AnonymousProgress<T>(observer.OnNext);
  168. }
  169. /// <summary>
  170. /// Converts an observer to a progress object.
  171. /// </summary>
  172. /// <param name="observer">The observer to convert.</param>
  173. /// <param name="scheduler">Scheduler to report progress on.</param>
  174. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  175. public static IProgress<T> ToProgress<T>(this IObserver<T> observer, IScheduler scheduler)
  176. {
  177. if (observer == null)
  178. throw new ArgumentNullException("observer");
  179. if (scheduler == null)
  180. throw new ArgumentNullException("scheduler");
  181. return new AnonymousProgress<T>(new ObserveOnObserver<T>(scheduler, observer, null).OnNext);
  182. }
  183. class AnonymousProgress<T> : IProgress<T>
  184. {
  185. private readonly Action<T> _progress;
  186. public AnonymousProgress(Action<T> progress)
  187. {
  188. _progress = progress;
  189. }
  190. public void Report(T value)
  191. {
  192. _progress(value);
  193. }
  194. }
  195. /// <summary>
  196. /// Converts a progress object to an observer.
  197. /// </summary>
  198. /// <param name="progress">The progress object to convert.</param>
  199. /// <returns>Observer whose OnNext messages correspond to the progress object's Report messages.</returns>
  200. public static IObserver<T> ToObserver<T>(this IProgress<T> progress)
  201. {
  202. if (progress == null)
  203. throw new ArgumentNullException("progress");
  204. return Create<T>(progress.Report);
  205. }
  206. #endif
  207. }
  208. }