Observable.Extensions.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.ComponentModel;
  5. using System.Reactive;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System
  9. {
  10. /// <summary>
  11. /// Provides a set of static methods for subscribing delegates to observables.
  12. /// </summary>
  13. public static class ObservableExtensions
  14. {
  15. #region Subscribe delegate-based overloads
  16. /// <summary>
  17. /// Subscribes to the observable sequence without specifying any handlers.
  18. /// This method can be used to evaluate the observable sequence for its side-effects only.
  19. /// </summary>
  20. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  21. /// <param name="source">Observable sequence to subscribe to.</param>
  22. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  23. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  24. public static IDisposable Subscribe<T>(this IObservable<T> source)
  25. {
  26. if (source == null)
  27. throw new ArgumentNullException(nameof(source));
  28. //
  29. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  30. //
  31. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(Stubs<T>.Ignore, Stubs.Throw, Stubs.Nop));
  32. }
  33. /// <summary>
  34. /// Subscribes an element handler to an observable sequence.
  35. /// </summary>
  36. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  37. /// <param name="source">Observable sequence to subscribe to.</param>
  38. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  39. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  40. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  41. public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
  42. {
  43. if (source == null)
  44. throw new ArgumentNullException(nameof(source));
  45. if (onNext == null)
  46. throw new ArgumentNullException(nameof(onNext));
  47. //
  48. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  49. //
  50. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, Stubs.Throw, Stubs.Nop));
  51. }
  52. /// <summary>
  53. /// Subscribes an element handler and an exception handler to an observable sequence.
  54. /// </summary>
  55. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  56. /// <param name="source">Observable sequence to subscribe to.</param>
  57. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  58. /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
  59. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  60. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
  61. public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError)
  62. {
  63. if (source == null)
  64. throw new ArgumentNullException(nameof(source));
  65. if (onNext == null)
  66. throw new ArgumentNullException(nameof(onNext));
  67. if (onError == null)
  68. throw new ArgumentNullException(nameof(onError));
  69. //
  70. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  71. //
  72. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, onError, Stubs.Nop));
  73. }
  74. /// <summary>
  75. /// Subscribes an element handler and a completion handler to an observable sequence.
  76. /// </summary>
  77. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  78. /// <param name="source">Observable sequence to subscribe to.</param>
  79. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  80. /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
  81. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  82. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
  83. public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)
  84. {
  85. if (source == null)
  86. throw new ArgumentNullException(nameof(source));
  87. if (onNext == null)
  88. throw new ArgumentNullException(nameof(onNext));
  89. if (onCompleted == null)
  90. throw new ArgumentNullException(nameof(onCompleted));
  91. //
  92. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  93. //
  94. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, Stubs.Throw, onCompleted));
  95. }
  96. /// <summary>
  97. /// Subscribes an element handler, an exception handler, and a completion handler to an observable sequence.
  98. /// </summary>
  99. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  100. /// <param name="source">Observable sequence to subscribe to.</param>
  101. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  102. /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
  103. /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
  104. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  105. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  106. public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
  107. {
  108. if (source == null)
  109. throw new ArgumentNullException(nameof(source));
  110. if (onNext == null)
  111. throw new ArgumentNullException(nameof(onNext));
  112. if (onError == null)
  113. throw new ArgumentNullException(nameof(onError));
  114. if (onCompleted == null)
  115. throw new ArgumentNullException(nameof(onCompleted));
  116. //
  117. // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
  118. //
  119. return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, onError, onCompleted));
  120. }
  121. #endregion
  122. #region Subscribe overloads with CancellationToken
  123. /// <summary>
  124. /// Subscribes an observer to an observable sequence, using a CancellationToken to support unsubscription.
  125. /// </summary>
  126. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  127. /// <param name="source">Observable sequence to subscribe to.</param>
  128. /// <param name="observer">Observer to subscribe to the sequence.</param>
  129. /// <param name="token">CancellationToken that can be signaled to unsubscribe from the source sequence.</param>
  130. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="observer"/> is null.</exception>
  131. public static void Subscribe<T>(this IObservable<T> source, IObserver<T> observer, CancellationToken token)
  132. {
  133. if (source == null)
  134. throw new ArgumentNullException(nameof(source));
  135. if (observer == null)
  136. throw new ArgumentNullException(nameof(observer));
  137. source.Subscribe_(observer, token);
  138. }
  139. /// <summary>
  140. /// Subscribes to the observable sequence without specifying any handlers, using a CancellationToken to support unsubscription.
  141. /// This method can be used to evaluate the observable sequence for its side-effects only.
  142. /// </summary>
  143. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  144. /// <param name="source">Observable sequence to subscribe to.</param>
  145. /// <param name="token">CancellationToken that can be signaled to unsubscribe from the source sequence.</param>
  146. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  147. public static void Subscribe<T>(this IObservable<T> source, CancellationToken token)
  148. {
  149. if (source == null)
  150. throw new ArgumentNullException(nameof(source));
  151. source.Subscribe_(new AnonymousObserver<T>(Stubs<T>.Ignore, Stubs.Throw, Stubs.Nop), token);
  152. }
  153. /// <summary>
  154. /// Subscribes an element handler to an observable sequence, using a CancellationToken to support unsubscription.
  155. /// </summary>
  156. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  157. /// <param name="source">Observable sequence to subscribe to.</param>
  158. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  159. /// <param name="token">CancellationToken that can be signaled to unsubscribe from the source sequence.</param>
  160. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  161. public static void Subscribe<T>(this IObservable<T> source, Action<T> onNext, CancellationToken token)
  162. {
  163. if (source == null)
  164. throw new ArgumentNullException(nameof(source));
  165. if (onNext == null)
  166. throw new ArgumentNullException(nameof(onNext));
  167. source.Subscribe_(new AnonymousObserver<T>(onNext, Stubs.Throw, Stubs.Nop), token);
  168. }
  169. /// <summary>
  170. /// Subscribes an element handler and an exception handler to an observable sequence, using a CancellationToken to support unsubscription.
  171. /// </summary>
  172. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  173. /// <param name="source">Observable sequence to subscribe to.</param>
  174. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  175. /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
  176. /// <param name="token">CancellationToken that can be signaled to unsubscribe from the source sequence.</param>
  177. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
  178. public static void Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, CancellationToken token)
  179. {
  180. if (source == null)
  181. throw new ArgumentNullException(nameof(source));
  182. if (onNext == null)
  183. throw new ArgumentNullException(nameof(onNext));
  184. if (onError == null)
  185. throw new ArgumentNullException(nameof(onError));
  186. source.Subscribe_(new AnonymousObserver<T>(onNext, onError, Stubs.Nop), token);
  187. }
  188. /// <summary>
  189. /// Subscribes an element handler and a completion handler to an observable sequence, using a CancellationToken to support unsubscription.
  190. /// </summary>
  191. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  192. /// <param name="source">Observable sequence to subscribe to.</param>
  193. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  194. /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
  195. /// <param name="token">CancellationToken that can be signaled to unsubscribe from the source sequence.</param>
  196. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
  197. public static void Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted, CancellationToken token)
  198. {
  199. if (source == null)
  200. throw new ArgumentNullException(nameof(source));
  201. if (onNext == null)
  202. throw new ArgumentNullException(nameof(onNext));
  203. if (onCompleted == null)
  204. throw new ArgumentNullException(nameof(onCompleted));
  205. source.Subscribe_(new AnonymousObserver<T>(onNext, Stubs.Throw, onCompleted), token);
  206. }
  207. /// <summary>
  208. /// Subscribes an element handler, an exception handler, and a completion handler to an observable sequence, using a CancellationToken to support unsubscription.
  209. /// </summary>
  210. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  211. /// <param name="source">Observable sequence to subscribe to.</param>
  212. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  213. /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
  214. /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
  215. /// <param name="token">CancellationToken that can be signaled to unsubscribe from the source sequence.</param>
  216. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  217. public static void Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted, CancellationToken token)
  218. {
  219. if (source == null)
  220. throw new ArgumentNullException(nameof(source));
  221. if (onNext == null)
  222. throw new ArgumentNullException(nameof(onNext));
  223. if (onError == null)
  224. throw new ArgumentNullException(nameof(onError));
  225. if (onCompleted == null)
  226. throw new ArgumentNullException(nameof(onCompleted));
  227. source.Subscribe_(new AnonymousObserver<T>(onNext, onError, onCompleted), token);
  228. }
  229. private static void Subscribe_<T>(this IObservable<T> source, IObserver<T> observer, CancellationToken token)
  230. {
  231. if (token.CanBeCanceled)
  232. {
  233. if (!token.IsCancellationRequested)
  234. {
  235. var r = new SingleAssignmentDisposable();
  236. //
  237. // [OK] Use of unsafe Subscribe: exception during Subscribe doesn't orphan CancellationTokenRegistration.
  238. //
  239. var d = source.Subscribe/*Unsafe*/(
  240. observer.OnNext,
  241. ex =>
  242. {
  243. using (r)
  244. observer.OnError(ex);
  245. },
  246. () =>
  247. {
  248. using (r)
  249. observer.OnCompleted();
  250. }
  251. );
  252. r.Disposable = token.Register(d.Dispose);
  253. }
  254. }
  255. else
  256. {
  257. source.Subscribe(observer);
  258. }
  259. }
  260. #endregion
  261. #region SubscribeSafe
  262. /// <summary>
  263. /// Subscribes to the specified source, re-routing synchronous exceptions during invocation of the Subscribe method to the observer's OnError channel.
  264. /// This method is typically used when writing query operators.
  265. /// </summary>
  266. /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
  267. /// <param name="source">Observable sequence to subscribe to.</param>
  268. /// <param name="observer">Observer that will be passed to the observable sequence, and that will be used for exception propagation.</param>
  269. /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
  270. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="observer"/> is null.</exception>
  271. [EditorBrowsable(EditorBrowsableState.Advanced)]
  272. public static IDisposable SubscribeSafe<T>(this IObservable<T> source, IObserver<T> observer)
  273. {
  274. if (source == null)
  275. throw new ArgumentNullException(nameof(source));
  276. if (observer == null)
  277. throw new ArgumentNullException(nameof(observer));
  278. //
  279. // The following types are white-listed and should not exhibit exceptional behavior
  280. // for regular operation circumstances.
  281. //
  282. if (source is ObservableBase<T>)
  283. return source.Subscribe(observer);
  284. #if !NO_PERF
  285. var producer = source as IProducer<T>;
  286. if (producer != null)
  287. return producer.SubscribeRaw(observer, false);
  288. #endif
  289. var d = Disposable.Empty;
  290. try
  291. {
  292. d = source.Subscribe(observer);
  293. }
  294. catch (Exception exception)
  295. {
  296. //
  297. // The effect of redirecting the exception to the OnError channel is automatic
  298. // clean-up of query operator state for a large number of cases. For example,
  299. // consider a binary and temporal query operator with the following Subscribe
  300. // behavior (implemented using the Producer pattern with a Run method):
  301. //
  302. // public IDisposable Run(...)
  303. // {
  304. // var tm = _scheduler.Schedule(_due, Tick);
  305. //
  306. // var df = _fst.SubscribeSafe(new FstObserver(this, ...));
  307. // var ds = _snd.SubscribeSafe(new SndObserver(this, ...)); // <-- fails
  308. //
  309. // return new CompositeDisposable(tm, df, ds);
  310. // }
  311. //
  312. // If the second subscription fails, we're not leaving the first subscription
  313. // or the scheduled job hanging around. Instead, the OnError propagation to
  314. // the SndObserver should take care of a Dispose call to the observer's parent
  315. // object. The handshake between Producer and Sink objects will ultimately
  316. // cause disposal of the CompositeDisposable that's returned from the method.
  317. //
  318. observer.OnError(exception);
  319. }
  320. return d;
  321. }
  322. #endregion
  323. }
  324. }