Observable.Extensions.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.ComponentModel;
  3. using System.Reactive;
  4. using System.Reactive.Disposables;
  5. namespace System
  6. {
  7. /// <summary>
  8. /// Provides a set of static methods for subscribing delegates to observables.
  9. /// </summary>
  10. public static class ObservableExtensions
  11. {
  12. /// <summary>
  13. /// Subscribes to the observable sequence without specifying any handlers.
  14. /// This method can be used to evaluate the observable sequence for its side-effects only.
  15. /// </summary>
  16. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  17. /// <param name="source">Observable sequence to subscribe to.</param>
  18. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  19. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  20. public static IDisposable Subscribe<T>(this IObservable<T> source)
  21. {
  22. if (source == null)
  23. throw new ArgumentNullException("source");
  24. //
  25. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  26. //
  27. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(Stubs<T>.Ignore, Stubs.Throw, Stubs.Nop));
  28. }
  29. /// <summary>
  30. /// Subscribes an element handler to an observable sequence.
  31. /// </summary>
  32. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  33. /// <param name="source">Observable sequence to subscribe to.</param>
  34. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  35. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  36. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  37. public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
  38. {
  39. if (source == null)
  40. throw new ArgumentNullException("source");
  41. if (onNext == null)
  42. throw new ArgumentNullException("onNext");
  43. //
  44. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  45. //
  46. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, Stubs.Throw, Stubs.Nop));
  47. }
  48. /// <summary>
  49. /// Subscribes an element handler and an exception handler to an observable sequence.
  50. /// </summary>
  51. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  52. /// <param name="source">Observable sequence to subscribe to.</param>
  53. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  54. /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
  55. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  56. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
  57. public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError)
  58. {
  59. if (source == null)
  60. throw new ArgumentNullException("source");
  61. if (onNext == null)
  62. throw new ArgumentNullException("onNext");
  63. if (onError == null)
  64. throw new ArgumentNullException("onError");
  65. //
  66. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  67. //
  68. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, onError, Stubs.Nop));
  69. }
  70. /// <summary>
  71. /// Subscribes an element handler and a completion handler to an observable sequence.
  72. /// </summary>
  73. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  74. /// <param name="source">Observable sequence to subscribe to.</param>
  75. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  76. /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
  77. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  78. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
  79. public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)
  80. {
  81. if (source == null)
  82. throw new ArgumentNullException("source");
  83. if (onNext == null)
  84. throw new ArgumentNullException("onNext");
  85. if (onCompleted == null)
  86. throw new ArgumentNullException("onCompleted");
  87. //
  88. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  89. //
  90. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, Stubs.Throw, onCompleted));
  91. }
  92. /// <summary>
  93. /// Subscribes an element handler, an exception handler, and a completion handler to an observable sequence.
  94. /// </summary>
  95. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  96. /// <param name="source">Observable sequence to subscribe to.</param>
  97. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  98. /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
  99. /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
  100. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  101. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  102. public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
  103. {
  104. if (source == null)
  105. throw new ArgumentNullException("source");
  106. if (onNext == null)
  107. throw new ArgumentNullException("onNext");
  108. if (onError == null)
  109. throw new ArgumentNullException("onError");
  110. if (onCompleted == null)
  111. throw new ArgumentNullException("onCompleted");
  112. //
  113. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  114. //
  115. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, onError, onCompleted));
  116. }
  117. /// <summary>
  118. /// Subscribes to the specified source, re-routing synchronous exceptions during invocation of the Subscribe method to the observer's OnError channel.
  119. /// This method is typically used when writing query operators.
  120. /// </summary>
  121. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  122. /// <param name="source">Observable sequence to subscribe to.</param>
  123. /// <param name="observer">Observer that will be passed to the observable sequence, and that will be used for exception propagation.</param>
  124. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  125. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="observer"/> is null.</exception>
  126. [EditorBrowsable(EditorBrowsableState.Advanced)]
  127. public static IDisposable SubscribeSafe<T>(this IObservable<T> source, IObserver<T> observer)
  128. {
  129. if (source == null)
  130. throw new ArgumentNullException("source");
  131. if (observer == null)
  132. throw new ArgumentNullException("observer");
  133. //
  134. // The following types are white-listed and should not exhibit exceptional behavior
  135. // for regular operation circumstances.
  136. //
  137. if (source is ObservableBase<T>)
  138. return source.Subscribe(observer);
  139. #if !NO_PERF
  140. var producer = source as Producer<T>;
  141. if (producer != null)
  142. return producer.SubscribeRaw(observer, false);
  143. #endif
  144. var d = Disposable.Empty;
  145. try
  146. {
  147. d = source.Subscribe(observer);
  148. }
  149. catch (Exception exception)
  150. {
  151. //
  152. // The effect of redirecting the exception to the OnError channel is automatic
  153. // clean-up of query operator state for a large number of cases. For example,
  154. // consider a binary and temporal query operator with the following Subscribe
  155. // behavior (implemented using the Producer pattern with a Run method):
  156. //
  157. // public IDisposable Run(...)
  158. // {
  159. // var tm = _scheduler.Schedule(_due, Tick);
  160. //
  161. // var df = _fst.SubscribeSafe(new FstObserver(this, ...));
  162. // var ds = _snd.SubscribeSafe(new SndObserver(this, ...)); // <-- fails
  163. //
  164. // return new CompositeDisposable(tm, df, ds);
  165. // }
  166. //
  167. // If the second subscription fails, we're not leaving the first subscription
  168. // or the scheduled job hanging around. Instead, the OnError propagation to
  169. // the SndObserver should take care of a Dispose call to the observer's parent
  170. // object. The handshake between Producer and Sink objects will ultimately
  171. // cause disposal of the CompositeDisposable that's returned from the method.
  172. //
  173. observer.OnError(exception);
  174. }
  175. return d;
  176. }
  177. }
  178. }