Do.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading.Tasks;
  5. namespace System.Reactive.Linq
  6. {
  7. public partial class AsyncObservable
  8. {
  9. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, IObserver<TSource> observer)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. if (observer == null)
  14. throw new ArgumentNullException(nameof(observer));
  15. return Create(
  16. source,
  17. observer,
  18. static (source, observer, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, observer)));
  19. }
  20. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Action<TSource> onNext)
  21. {
  22. if (source == null)
  23. throw new ArgumentNullException(nameof(source));
  24. if (onNext == null)
  25. throw new ArgumentNullException(nameof(onNext));
  26. return Create(
  27. source,
  28. onNext,
  29. static (source, onNext, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onNext)));
  30. }
  31. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Action<Exception> onError)
  32. {
  33. if (source == null)
  34. throw new ArgumentNullException(nameof(source));
  35. if (onError == null)
  36. throw new ArgumentNullException(nameof(onError));
  37. return Create(
  38. source,
  39. onError,
  40. static (source, onError, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onError)));
  41. }
  42. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Action onCompleted)
  43. {
  44. if (source == null)
  45. throw new ArgumentNullException(nameof(source));
  46. if (onCompleted == null)
  47. throw new ArgumentNullException(nameof(onCompleted));
  48. return Create(
  49. source,
  50. onCompleted,
  51. static (source, onCompleted, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onCompleted)));
  52. }
  53. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  54. {
  55. if (source == null)
  56. throw new ArgumentNullException(nameof(source));
  57. if (onNext == null)
  58. throw new ArgumentNullException(nameof(onNext));
  59. if (onError == null)
  60. throw new ArgumentNullException(nameof(onError));
  61. if (onCompleted == null)
  62. throw new ArgumentNullException(nameof(onCompleted));
  63. return Create(
  64. source,
  65. (onNext, onError, onCompleted),
  66. static (source, state, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, state.onNext, state.onError, state.onCompleted)));
  67. }
  68. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, IAsyncObserver<TSource> observer)
  69. {
  70. if (source == null)
  71. throw new ArgumentNullException(nameof(source));
  72. if (observer == null)
  73. throw new ArgumentNullException(nameof(observer));
  74. return Create(
  75. source,
  76. observer,
  77. static (source, observer, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, observer)));
  78. }
  79. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask> onNext)
  80. {
  81. if (source == null)
  82. throw new ArgumentNullException(nameof(source));
  83. if (onNext == null)
  84. throw new ArgumentNullException(nameof(onNext));
  85. return Create(
  86. source,
  87. onNext,
  88. static (source, onNext, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onNext)));
  89. }
  90. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Func<Exception, ValueTask> onError)
  91. {
  92. if (source == null)
  93. throw new ArgumentNullException(nameof(source));
  94. if (onError == null)
  95. throw new ArgumentNullException(nameof(onError));
  96. return Create(
  97. source,
  98. onError,
  99. static (source, onError, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onError)));
  100. }
  101. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Func<ValueTask> onCompleted)
  102. {
  103. if (source == null)
  104. throw new ArgumentNullException(nameof(source));
  105. if (onCompleted == null)
  106. throw new ArgumentNullException(nameof(onCompleted));
  107. return Create(
  108. source,
  109. onCompleted,
  110. static (source, onCompleted, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onCompleted)));
  111. }
  112. public static IAsyncObservable<TSource> Do<TSource>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask> onNext, Func<Exception, ValueTask> onError, Func<ValueTask> onCompleted)
  113. {
  114. if (source == null)
  115. throw new ArgumentNullException(nameof(source));
  116. if (onNext == null)
  117. throw new ArgumentNullException(nameof(onNext));
  118. if (onError == null)
  119. throw new ArgumentNullException(nameof(onError));
  120. if (onCompleted == null)
  121. throw new ArgumentNullException(nameof(onCompleted));
  122. return Create(
  123. source,
  124. (onNext, onError, onCompleted),
  125. static (source, state, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, state.onNext, state.onError, state.onCompleted)));
  126. }
  127. }
  128. public partial class AsyncObserver
  129. {
  130. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, IAsyncObserver<TSource> witness)
  131. {
  132. if (observer == null)
  133. throw new ArgumentNullException(nameof(observer));
  134. if (witness == null)
  135. throw new ArgumentNullException(nameof(witness));
  136. return Create<TSource>(
  137. async x =>
  138. {
  139. try
  140. {
  141. await witness.OnNextAsync(x).ConfigureAwait(false);
  142. }
  143. catch (Exception ex)
  144. {
  145. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  146. return;
  147. }
  148. await observer.OnNextAsync(x).ConfigureAwait(false);
  149. },
  150. async error =>
  151. {
  152. try
  153. {
  154. await witness.OnErrorAsync(error).ConfigureAwait(false);
  155. }
  156. catch (Exception ex)
  157. {
  158. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  159. return;
  160. }
  161. await observer.OnErrorAsync(error).ConfigureAwait(false);
  162. },
  163. async () =>
  164. {
  165. try
  166. {
  167. await witness.OnCompletedAsync().ConfigureAwait(false);
  168. }
  169. catch (Exception ex)
  170. {
  171. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  172. return;
  173. }
  174. await observer.OnCompletedAsync().ConfigureAwait(false);
  175. }
  176. );
  177. }
  178. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Func<TSource, ValueTask> onNext)
  179. {
  180. if (observer == null)
  181. throw new ArgumentNullException(nameof(observer));
  182. if (onNext == null)
  183. throw new ArgumentNullException(nameof(onNext));
  184. return Do(observer, Create(onNext));
  185. }
  186. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Func<Exception, ValueTask> onError)
  187. {
  188. if (observer == null)
  189. throw new ArgumentNullException(nameof(observer));
  190. if (onError == null)
  191. throw new ArgumentNullException(nameof(onError));
  192. return Do(observer, Create<TSource>(_ => default, onError, () => default));
  193. }
  194. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Func<ValueTask> onCompleted)
  195. {
  196. if (observer == null)
  197. throw new ArgumentNullException(nameof(observer));
  198. if (onCompleted == null)
  199. throw new ArgumentNullException(nameof(onCompleted));
  200. return Do(observer, Create<TSource>(_ => default, _ => default, onCompleted));
  201. }
  202. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Func<TSource, ValueTask> onNext, Func<Exception, ValueTask> onError, Func<ValueTask> onCompleted)
  203. {
  204. if (observer == null)
  205. throw new ArgumentNullException(nameof(observer));
  206. if (onNext == null)
  207. throw new ArgumentNullException(nameof(onNext));
  208. if (onError == null)
  209. throw new ArgumentNullException(nameof(onError));
  210. if (onCompleted == null)
  211. throw new ArgumentNullException(nameof(onCompleted));
  212. return Do(observer, Create(onNext, onError, onCompleted));
  213. }
  214. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, IObserver<TSource> witness)
  215. {
  216. if (observer == null)
  217. throw new ArgumentNullException(nameof(observer));
  218. if (witness == null)
  219. throw new ArgumentNullException(nameof(witness));
  220. return Create<TSource>(
  221. async x =>
  222. {
  223. try
  224. {
  225. witness.OnNext(x);
  226. }
  227. catch (Exception ex)
  228. {
  229. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  230. return;
  231. }
  232. await observer.OnNextAsync(x).ConfigureAwait(false);
  233. },
  234. async error =>
  235. {
  236. try
  237. {
  238. witness.OnError(error);
  239. }
  240. catch (Exception ex)
  241. {
  242. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  243. return;
  244. }
  245. await observer.OnErrorAsync(error).ConfigureAwait(false);
  246. },
  247. async () =>
  248. {
  249. try
  250. {
  251. witness.OnCompleted();
  252. }
  253. catch (Exception ex)
  254. {
  255. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  256. return;
  257. }
  258. await observer.OnCompletedAsync().ConfigureAwait(false);
  259. }
  260. );
  261. }
  262. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Action<TSource> onNext)
  263. {
  264. if (observer == null)
  265. throw new ArgumentNullException(nameof(observer));
  266. if (onNext == null)
  267. throw new ArgumentNullException(nameof(onNext));
  268. return Do(observer, x => { onNext(x); return default; });
  269. }
  270. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Action<Exception> onError)
  271. {
  272. if (observer == null)
  273. throw new ArgumentNullException(nameof(observer));
  274. if (onError == null)
  275. throw new ArgumentNullException(nameof(onError));
  276. return Do(observer, Create<TSource>(_ => default, ex => { onError(ex); return default; }, () => default));
  277. }
  278. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Action onCompleted)
  279. {
  280. if (observer == null)
  281. throw new ArgumentNullException(nameof(observer));
  282. if (onCompleted == null)
  283. throw new ArgumentNullException(nameof(onCompleted));
  284. return Do(observer, Create<TSource>(_ => default, _ => default, () => { onCompleted(); return default; }));
  285. }
  286. public static IAsyncObserver<TSource> Do<TSource>(IAsyncObserver<TSource> observer, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  287. {
  288. if (observer == null)
  289. throw new ArgumentNullException(nameof(observer));
  290. if (onNext == null)
  291. throw new ArgumentNullException(nameof(onNext));
  292. if (onError == null)
  293. throw new ArgumentNullException(nameof(onError));
  294. if (onCompleted == null)
  295. throw new ArgumentNullException(nameof(onCompleted));
  296. return Do(observer, x => { onNext(x); return default; }, ex => { onError(ex); return default; }, () => { onCompleted(); return default; });
  297. }
  298. }
  299. }