1
0

Observer.Extensions.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Threading;
  6. namespace System.Reactive
  7. {
  8. /// <summary>
  9. /// Provides a set of static methods for creating observers.
  10. /// </summary>
  11. public static class Observer
  12. {
  13. /// <summary>
  14. /// Creates an observer from a notification callback.
  15. /// </summary>
  16. /// <param name="handler">Action that handles a notification.</param>
  17. /// <returns>The observer object that invokes the specified handler using a notification corresponding to each message it receives.</returns>
  18. public static IObserver<T> ToObserver<T>(this Action<Notification<T>> handler)
  19. {
  20. if (handler == null)
  21. throw new ArgumentNullException("handler");
  22. return new AnonymousObserver<T>(
  23. x => handler(Notification.CreateOnNext<T>(x)),
  24. exception => handler(Notification.CreateOnError<T>(exception)),
  25. () => handler(Notification.CreateOnCompleted<T>()));
  26. }
  27. /// <summary>
  28. /// Creates a notification callback from an observer.
  29. /// </summary>
  30. /// <param name="observer">Observer object.</param>
  31. /// <returns>The action that forwards its input notification to the underlying observer.</returns>
  32. public static Action<Notification<T>> ToNotifier<T>(this IObserver<T> observer)
  33. {
  34. if (observer == null)
  35. throw new ArgumentNullException("observer");
  36. return n => n.Accept(observer);
  37. }
  38. /// <summary>
  39. /// Creates an observer from the specified OnNext action.
  40. /// </summary>
  41. /// <param name="onNext">Observer's OnNext action implementation.</param>
  42. /// <returns>The observer object implemented using the given actions.</returns>
  43. public static IObserver<T> Create<T>(Action<T> onNext)
  44. {
  45. if (onNext == null)
  46. throw new ArgumentNullException("onNext");
  47. return new AnonymousObserver<T>(onNext);
  48. }
  49. /// <summary>
  50. /// Creates an observer from the specified OnNext and OnError actions.
  51. /// </summary>
  52. /// <param name="onNext">Observer's OnNext action implementation.</param>
  53. /// <param name="onError">Observer's OnError action implementation.</param>
  54. /// <returns>The observer object implemented using the given actions.</returns>
  55. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError)
  56. {
  57. if (onNext == null)
  58. throw new ArgumentNullException("onNext");
  59. if (onError == null)
  60. throw new ArgumentNullException("onError");
  61. return new AnonymousObserver<T>(onNext, onError);
  62. }
  63. /// <summary>
  64. /// Creates an observer from the specified OnNext and OnCompleted actions.
  65. /// </summary>
  66. /// <param name="onNext">Observer's OnNext action implementation.</param>
  67. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  68. /// <returns>The observer object implemented using the given actions.</returns>
  69. public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted)
  70. {
  71. if (onNext == null)
  72. throw new ArgumentNullException("onNext");
  73. if (onCompleted == null)
  74. throw new ArgumentNullException("onCompleted");
  75. return new AnonymousObserver<T>(onNext, onCompleted);
  76. }
  77. /// <summary>
  78. /// Creates an observer from the specified OnNext, OnError, and OnCompleted actions.
  79. /// </summary>
  80. /// <param name="onNext">Observer's OnNext action implementation.</param>
  81. /// <param name="onError">Observer's OnError action implementation.</param>
  82. /// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
  83. /// <returns>The observer object implemented using the given actions.</returns>
  84. public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  85. {
  86. if (onNext == null)
  87. throw new ArgumentNullException("onNext");
  88. if (onError == null)
  89. throw new ArgumentNullException("onError");
  90. if (onCompleted == null)
  91. throw new ArgumentNullException("onCompleted");
  92. return new AnonymousObserver<T>(onNext, onError, onCompleted);
  93. }
  94. /// <summary>
  95. /// Hides the identity of an observer.
  96. /// </summary>
  97. /// <param name="observer">An observer whose identity to hide.</param>
  98. /// <returns>An observer that hides the identity of the specified observer.</returns>
  99. public static IObserver<T> AsObserver<T>(this IObserver<T> observer)
  100. {
  101. if (observer == null)
  102. throw new ArgumentNullException("observer");
  103. return new AnonymousObserver<T>(observer.OnNext, observer.OnError, observer.OnCompleted);
  104. }
  105. /// <summary>
  106. /// Synchronizes the observer messages.
  107. /// </summary>
  108. /// <param name="observer">The observer to synchronize.</param>
  109. /// <param name="gate">Gate object to synchronize each observer call on.</param>
  110. /// <returns>The observer whose messages are synchronized on the given gate object.</returns>
  111. public static IObserver<T> Synchronize<T>(IObserver<T> observer, object gate)
  112. {
  113. if (observer == null)
  114. throw new ArgumentNullException("observer");
  115. if (gate == null)
  116. throw new ArgumentNullException("gate");
  117. return new SynchronizedObserver<T>(observer, gate);
  118. }
  119. /// <summary>
  120. /// Synchronizes the observer messages.
  121. /// </summary>
  122. /// <param name="observer">The observer to synchronize.</param>
  123. /// <returns>The observer whose messages are synchronized.</returns>
  124. public static IObserver<T> Synchronize<T>(IObserver<T> observer)
  125. {
  126. if (observer == null)
  127. throw new ArgumentNullException("observer");
  128. return Synchronize(observer, new object());
  129. }
  130. /// <summary>
  131. /// Schedules the observer messages on the given scheduler.
  132. /// </summary>
  133. /// <param name="observer">The observer to schedule messages for.</param>
  134. /// <param name="scheduler">Scheduler to schedule observer messages on.</param>
  135. /// <returns>Observer whose messages are scheduled on the given scheduler.</returns>
  136. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, IScheduler scheduler)
  137. {
  138. if (observer == null)
  139. throw new ArgumentNullException("observer");
  140. if (scheduler == null)
  141. throw new ArgumentNullException("scheduler");
  142. return new ObserveOnObserver<T>(scheduler, observer, null);
  143. }
  144. #if !NO_SYNCCTX
  145. /// <summary>
  146. /// Schedules the observer messages on the given synchonization context.
  147. /// </summary>
  148. /// <param name="observer">The observer to schedule messages for.</param>
  149. /// <param name="context">Synchonization context to schedule observer messages on.</param>
  150. /// <returns>Observer whose messages are scheduled on the given synchonization context.</returns>
  151. public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, SynchronizationContext context)
  152. {
  153. if (observer == null)
  154. throw new ArgumentNullException("observer");
  155. if (context == null)
  156. throw new ArgumentNullException("context");
  157. return new ObserveOnObserver<T>(new SynchronizationContextScheduler(context), observer, null);
  158. }
  159. #endif
  160. #if HAS_PROGRESS
  161. /// <summary>
  162. /// Converts an observer to a progress object.
  163. /// </summary>
  164. /// <param name="observer">The observer to convert.</param>
  165. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  166. public static IProgress<T> ToProgress<T>(this IObserver<T> observer)
  167. {
  168. if (observer == null)
  169. throw new ArgumentNullException("observer");
  170. return new AnonymousProgress<T>(observer.OnNext);
  171. }
  172. /// <summary>
  173. /// Converts an observer to a progress object.
  174. /// </summary>
  175. /// <param name="observer">The observer to convert.</param>
  176. /// <param name="scheduler">Scheduler to report progress on.</param>
  177. /// <returns>Progress object whose Report messages correspond to the observer's OnNext messages.</returns>
  178. public static IProgress<T> ToProgress<T>(this IObserver<T> observer, IScheduler scheduler)
  179. {
  180. if (observer == null)
  181. throw new ArgumentNullException("observer");
  182. if (scheduler == null)
  183. throw new ArgumentNullException("scheduler");
  184. return new AnonymousProgress<T>(new ObserveOnObserver<T>(scheduler, observer, null).OnNext);
  185. }
  186. class AnonymousProgress<T> : IProgress<T>
  187. {
  188. private readonly Action<T> _progress;
  189. public AnonymousProgress(Action<T> progress)
  190. {
  191. _progress = progress;
  192. }
  193. public void Report(T value)
  194. {
  195. _progress(value);
  196. }
  197. }
  198. /// <summary>
  199. /// Converts a progress object to an observer.
  200. /// </summary>
  201. /// <param name="progress">The progress object to convert.</param>
  202. /// <returns>Observer whose OnNext messages correspond to the progress object's Report messages.</returns>
  203. public static IObserver<T> ToObserver<T>(this IProgress<T> progress)
  204. {
  205. if (progress == null)
  206. throw new ArgumentNullException("progress");
  207. return Create<T>(progress.Report);
  208. }
  209. #endif
  210. }
  211. }