Do.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading.Tasks;
  5. namespace System.Reactive.Linq
  6. {
  7. partial class AsyncObservable
  8. {
  9. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, IObserver<TSource> observer)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. if (observer == null)
  14. throw new ArgumentNullException(nameof(observer));
  15. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, observer)));
  16. }
  17. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Action<TSource> onNext)
  18. {
  19. if (source == null)
  20. throw new ArgumentNullException(nameof(source));
  21. if (onNext == null)
  22. throw new ArgumentNullException(nameof(onNext));
  23. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, onNext)));
  24. }
  25. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Action<Exception> onError)
  26. {
  27. if (source == null)
  28. throw new ArgumentNullException(nameof(source));
  29. if (onError == null)
  30. throw new ArgumentNullException(nameof(onError));
  31. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, onError)));
  32. }
  33. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Action onCompleted)
  34. {
  35. if (source == null)
  36. throw new ArgumentNullException(nameof(source));
  37. if (onCompleted == null)
  38. throw new ArgumentNullException(nameof(onCompleted));
  39. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, onCompleted)));
  40. }
  41. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  42. {
  43. if (source == null)
  44. throw new ArgumentNullException(nameof(source));
  45. if (onNext == null)
  46. throw new ArgumentNullException(nameof(onNext));
  47. if (onError == null)
  48. throw new ArgumentNullException(nameof(onError));
  49. if (onCompleted == null)
  50. throw new ArgumentNullException(nameof(onCompleted));
  51. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, onNext, onError, onCompleted)));
  52. }
  53. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, IAsyncObserver<TSource> observer)
  54. {
  55. if (source == null)
  56. throw new ArgumentNullException(nameof(source));
  57. if (observer == null)
  58. throw new ArgumentNullException(nameof(observer));
  59. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, observer)));
  60. }
  61. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask> onNext)
  62. {
  63. if (source == null)
  64. throw new ArgumentNullException(nameof(source));
  65. if (onNext == null)
  66. throw new ArgumentNullException(nameof(onNext));
  67. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, onNext)));
  68. }
  69. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Func<Exception, ValueTask> onError)
  70. {
  71. if (source == null)
  72. throw new ArgumentNullException(nameof(source));
  73. if (onError == null)
  74. throw new ArgumentNullException(nameof(onError));
  75. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, onError)));
  76. }
  77. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Func<ValueTask> onCompleted)
  78. {
  79. if (source == null)
  80. throw new ArgumentNullException(nameof(source));
  81. if (onCompleted == null)
  82. throw new ArgumentNullException(nameof(onCompleted));
  83. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, onCompleted)));
  84. }
  85. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask> onNext, Func<Exception, ValueTask> onError, Func<ValueTask> onCompleted)
  86. {
  87. if (source == null)
  88. throw new ArgumentNullException(nameof(source));
  89. if (onNext == null)
  90. throw new ArgumentNullException(nameof(onNext));
  91. if (onError == null)
  92. throw new ArgumentNullException(nameof(onError));
  93. if (onCompleted == null)
  94. throw new ArgumentNullException(nameof(onCompleted));
  95. return Create<TSource>(target => source.SubscribeSafeAsync(AsyncObserver.Do(target, onNext, onError, onCompleted)));
  96. }
  97. }
  98. partial class AsyncObserver
  99. {
  100. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, IAsyncObserver<TSource> witness)
  101. {
  102. if (observer == null)
  103. throw new ArgumentNullException(nameof(observer));
  104. if (witness == null)
  105. throw new ArgumentNullException(nameof(witness));
  106. return Create<TSource>(
  107. async x =>
  108. {
  109. try
  110. {
  111. await witness.OnNextAsync(x).ConfigureAwait(false);
  112. }
  113. catch (Exception ex)
  114. {
  115. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  116. return;
  117. }
  118. await observer.OnNextAsync(x).ConfigureAwait(false);
  119. },
  120. async error =>
  121. {
  122. try
  123. {
  124. await witness.OnErrorAsync(error).ConfigureAwait(false);
  125. }
  126. catch (Exception ex)
  127. {
  128. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  129. return;
  130. }
  131. await observer.OnErrorAsync(error).ConfigureAwait(false);
  132. },
  133. async () =>
  134. {
  135. try
  136. {
  137. await witness.OnCompletedAsync().ConfigureAwait(false);
  138. }
  139. catch (Exception ex)
  140. {
  141. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  142. return;
  143. }
  144. await observer.OnCompletedAsync().ConfigureAwait(false);
  145. }
  146. );
  147. }
  148. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Func<TSource, ValueTask> onNext)
  149. {
  150. if (observer == null)
  151. throw new ArgumentNullException(nameof(observer));
  152. if (onNext == null)
  153. throw new ArgumentNullException(nameof(onNext));
  154. return Do(observer, Create(onNext));
  155. }
  156. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Func<Exception, ValueTask> onError)
  157. {
  158. if (observer == null)
  159. throw new ArgumentNullException(nameof(observer));
  160. if (onError == null)
  161. throw new ArgumentNullException(nameof(onError));
  162. return Do(observer, Create<TSource>(_ => default, onError, () => default));
  163. }
  164. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Func<ValueTask> onCompleted)
  165. {
  166. if (observer == null)
  167. throw new ArgumentNullException(nameof(observer));
  168. if (onCompleted == null)
  169. throw new ArgumentNullException(nameof(onCompleted));
  170. return Do(observer, Create<TSource>(_ => default, _ => default, onCompleted));
  171. }
  172. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Func<TSource, ValueTask> onNext, Func<Exception, ValueTask> onError, Func<ValueTask> onCompleted)
  173. {
  174. if (observer == null)
  175. throw new ArgumentNullException(nameof(observer));
  176. if (onNext == null)
  177. throw new ArgumentNullException(nameof(onNext));
  178. if (onError == null)
  179. throw new ArgumentNullException(nameof(onError));
  180. if (onCompleted == null)
  181. throw new ArgumentNullException(nameof(onCompleted));
  182. return Do(observer, Create(onNext, onError, onCompleted));
  183. }
  184. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, IObserver<TSource> witness)
  185. {
  186. if (observer == null)
  187. throw new ArgumentNullException(nameof(observer));
  188. if (witness == null)
  189. throw new ArgumentNullException(nameof(witness));
  190. return Create<TSource>(
  191. async x =>
  192. {
  193. try
  194. {
  195. witness.OnNext(x);
  196. }
  197. catch (Exception ex)
  198. {
  199. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  200. return;
  201. }
  202. await observer.OnNextAsync(x).ConfigureAwait(false);
  203. },
  204. async error =>
  205. {
  206. try
  207. {
  208. witness.OnError(error);
  209. }
  210. catch (Exception ex)
  211. {
  212. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  213. return;
  214. }
  215. await observer.OnErrorAsync(error).ConfigureAwait(false);
  216. },
  217. async () =>
  218. {
  219. try
  220. {
  221. witness.OnCompleted();
  222. }
  223. catch (Exception ex)
  224. {
  225. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  226. return;
  227. }
  228. await observer.OnCompletedAsync().ConfigureAwait(false);
  229. }
  230. );
  231. }
  232. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Action<TSource> onNext)
  233. {
  234. if (observer == null)
  235. throw new ArgumentNullException(nameof(observer));
  236. if (onNext == null)
  237. throw new ArgumentNullException(nameof(onNext));
  238. return Do(observer, x => { onNext(x); return default; });
  239. }
  240. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Action<Exception> onError)
  241. {
  242. if (observer == null)
  243. throw new ArgumentNullException(nameof(observer));
  244. if (onError == null)
  245. throw new ArgumentNullException(nameof(onError));
  246. return Do(observer, Create<TSource>(_ => default, ex => { onError(ex); return default; }, () => default));
  247. }
  248. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Action onCompleted)
  249. {
  250. if (observer == null)
  251. throw new ArgumentNullException(nameof(observer));
  252. if (onCompleted == null)
  253. throw new ArgumentNullException(nameof(onCompleted));
  254. return Do(observer, Create<TSource>(_ => default, _ => default, () => { onCompleted(); return default; }));
  255. }
  256. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  257. {
  258. if (observer == null)
  259. throw new ArgumentNullException(nameof(observer));
  260. if (onNext == null)
  261. throw new ArgumentNullException(nameof(onNext));
  262. if (onError == null)
  263. throw new ArgumentNullException(nameof(onError));
  264. if (onCompleted == null)
  265. throw new ArgumentNullException(nameof(onCompleted));
  266. return Do(observer, x => { onNext(x); return default; }, ex => { onError(ex); return default; }, () => { onCompleted(); return default; });
  267. }
  268. }
  269. }