Append.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. namespace System.Reactive.Linq
  8. {
  9. partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, TSource value)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. return Create<TSource>(observer => source.SubscribeSafeAsync(AsyncObserver.Append(observer, value)));
  16. }
  17. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, TSource value, IAsyncScheduler scheduler)
  18. {
  19. if (source == null)
  20. throw new ArgumentNullException(nameof(source));
  21. if (scheduler == null)
  22. throw new ArgumentNullException(nameof(scheduler));
  23. return Create<TSource>(async observer =>
  24. {
  25. var d = new CompositeAsyncDisposable();
  26. var (sink, disposable) = AsyncObserver.Append(observer, value, scheduler);
  27. await d.AddAsync(disposable).ConfigureAwait(false);
  28. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  29. await d.AddAsync(subscription).ConfigureAwait(false);
  30. return d;
  31. });
  32. }
  33. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, params TSource[] values)
  34. {
  35. if (source == null)
  36. throw new ArgumentNullException(nameof(source));
  37. if (values == null)
  38. throw new ArgumentNullException(nameof(values));
  39. return Create<TSource>(observer => source.SubscribeSafeAsync(AsyncObserver.Append(observer, values)));
  40. }
  41. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, IAsyncScheduler scheduler, params TSource[] values)
  42. {
  43. if (source == null)
  44. throw new ArgumentNullException(nameof(source));
  45. if (scheduler == null)
  46. throw new ArgumentNullException(nameof(scheduler));
  47. if (values == null)
  48. throw new ArgumentNullException(nameof(values));
  49. return Create<TSource>(async observer =>
  50. {
  51. var d = new CompositeAsyncDisposable();
  52. var (sink, disposable) = AsyncObserver.Append(observer, scheduler, values);
  53. await d.AddAsync(disposable).ConfigureAwait(false);
  54. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  55. await d.AddAsync(subscription).ConfigureAwait(false);
  56. return d;
  57. });
  58. }
  59. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, IEnumerable<TSource> values)
  60. {
  61. if (source == null)
  62. throw new ArgumentNullException(nameof(source));
  63. if (values == null)
  64. throw new ArgumentNullException(nameof(values));
  65. return Create<TSource>(observer => source.SubscribeSafeAsync(AsyncObserver.Append(observer, values)));
  66. }
  67. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, IAsyncScheduler scheduler, IEnumerable<TSource> values)
  68. {
  69. if (source == null)
  70. throw new ArgumentNullException(nameof(source));
  71. if (scheduler == null)
  72. throw new ArgumentNullException(nameof(scheduler));
  73. if (values == null)
  74. throw new ArgumentNullException(nameof(values));
  75. return Create<TSource>(async observer =>
  76. {
  77. var d = new CompositeAsyncDisposable();
  78. var (sink, disposable) = AsyncObserver.Append(observer, scheduler, values);
  79. await d.AddAsync(disposable).ConfigureAwait(false);
  80. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  81. await d.AddAsync(subscription).ConfigureAwait(false);
  82. return d;
  83. });
  84. }
  85. }
  86. partial class AsyncObserver
  87. {
  88. public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, TSource value)
  89. {
  90. if (observer == null)
  91. throw new ArgumentNullException(nameof(observer));
  92. return Create<TSource>(
  93. observer.OnNextAsync,
  94. observer.OnErrorAsync,
  95. async () =>
  96. {
  97. await observer.OnNextAsync(value).ConfigureAwait(false);
  98. await observer.OnCompletedAsync().ConfigureAwait(false);
  99. }
  100. );
  101. }
  102. public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsyncObserver<TSource> observer, TSource value, IAsyncScheduler scheduler)
  103. {
  104. if (observer == null)
  105. throw new ArgumentNullException(nameof(observer));
  106. if (scheduler == null)
  107. throw new ArgumentNullException(nameof(scheduler));
  108. var d = new SingleAssignmentAsyncDisposable();
  109. return
  110. (
  111. Create<TSource>(
  112. observer.OnNextAsync,
  113. observer.OnErrorAsync,
  114. async () =>
  115. {
  116. var task = await scheduler.ScheduleAsync(async ct =>
  117. {
  118. if (!ct.IsCancellationRequested)
  119. {
  120. await observer.OnNextAsync(value).RendezVous(scheduler, ct);
  121. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  122. }
  123. }).ConfigureAwait(false);
  124. await d.AssignAsync(task).ConfigureAwait(false);
  125. }
  126. ),
  127. d
  128. );
  129. }
  130. public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, params TSource[] values)
  131. {
  132. if (observer == null)
  133. throw new ArgumentNullException(nameof(observer));
  134. if (values == null)
  135. throw new ArgumentNullException(nameof(values));
  136. return Create<TSource>(
  137. observer.OnNextAsync,
  138. observer.OnErrorAsync,
  139. async () =>
  140. {
  141. foreach (var value in values)
  142. {
  143. await observer.OnNextAsync(value).ConfigureAwait(false);
  144. }
  145. await observer.OnCompletedAsync().ConfigureAwait(false);
  146. }
  147. );
  148. }
  149. public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, params TSource[] values)
  150. {
  151. if (observer == null)
  152. throw new ArgumentNullException(nameof(observer));
  153. if (values == null)
  154. throw new ArgumentNullException(nameof(values));
  155. if (scheduler == null)
  156. throw new ArgumentNullException(nameof(scheduler));
  157. var d = new SingleAssignmentAsyncDisposable();
  158. return
  159. (
  160. Create<TSource>(
  161. observer.OnNextAsync,
  162. observer.OnErrorAsync,
  163. async () =>
  164. {
  165. var task = await scheduler.ScheduleAsync(async ct =>
  166. {
  167. for (var i = 0; i < values.Length && !ct.IsCancellationRequested; i++)
  168. {
  169. await observer.OnNextAsync(values[i]).RendezVous(scheduler, ct);
  170. }
  171. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  172. }).ConfigureAwait(false);
  173. await d.AssignAsync(task).ConfigureAwait(false);
  174. }
  175. ),
  176. d
  177. );
  178. }
  179. public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, IEnumerable<TSource> values)
  180. {
  181. if (observer == null)
  182. throw new ArgumentNullException(nameof(observer));
  183. if (values == null)
  184. throw new ArgumentNullException(nameof(values));
  185. return Create<TSource>(
  186. observer.OnNextAsync,
  187. observer.OnErrorAsync,
  188. async () =>
  189. {
  190. foreach (var value in values)
  191. {
  192. await observer.OnNextAsync(value).ConfigureAwait(false);
  193. }
  194. await observer.OnCompletedAsync().ConfigureAwait(false);
  195. }
  196. );
  197. }
  198. public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, IEnumerable<TSource> values)
  199. {
  200. if (observer == null)
  201. throw new ArgumentNullException(nameof(observer));
  202. if (values == null)
  203. throw new ArgumentNullException(nameof(values));
  204. if (scheduler == null)
  205. throw new ArgumentNullException(nameof(scheduler));
  206. var d = new SingleAssignmentAsyncDisposable();
  207. return
  208. (
  209. Create<TSource>(
  210. observer.OnNextAsync,
  211. observer.OnErrorAsync,
  212. async () =>
  213. {
  214. var task = await scheduler.ScheduleAsync(async ct =>
  215. {
  216. var e = default(IEnumerator<TSource>);
  217. try
  218. {
  219. e = values.GetEnumerator();
  220. }
  221. catch (Exception ex)
  222. {
  223. await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
  224. return;
  225. }
  226. using (e)
  227. {
  228. while (!ct.IsCancellationRequested)
  229. {
  230. var b = default(bool);
  231. var value = default(TSource);
  232. try
  233. {
  234. b = e.MoveNext();
  235. if (b)
  236. {
  237. value = e.Current;
  238. }
  239. }
  240. catch (Exception ex)
  241. {
  242. await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
  243. return;
  244. }
  245. if (b)
  246. {
  247. await observer.OnNextAsync(value).RendezVous(scheduler, ct);
  248. }
  249. else
  250. {
  251. break;
  252. }
  253. }
  254. }
  255. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  256. }).ConfigureAwait(false);
  257. await d.AssignAsync(task).ConfigureAwait(false);
  258. }
  259. ),
  260. d
  261. );
  262. }
  263. }
  264. }