Append.cs 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. namespace System.Reactive.Linq
  8. {
  9. partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, TSource value)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. return Create<TSource>(observer => source.SubscribeSafeAsync(AsyncObserver.Append(observer, value)));
  16. }
  17. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, TSource value, IAsyncScheduler scheduler)
  18. {
  19. if (source == null)
  20. throw new ArgumentNullException(nameof(source));
  21. if (scheduler == null)
  22. throw new ArgumentNullException(nameof(scheduler));
  23. return Create<TSource>(async observer =>
  24. {
  25. var d = new CompositeAsyncDisposable();
  26. var (sink, disposable) = AsyncObserver.Append(observer, value, scheduler);
  27. await d.AddAsync(disposable).ConfigureAwait(false);
  28. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  29. await d.AddAsync(subscription).ConfigureAwait(false);
  30. return d;
  31. });
  32. }
  33. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, params TSource[] values)
  34. {
  35. if (source == null)
  36. throw new ArgumentNullException(nameof(source));
  37. if (values == null)
  38. throw new ArgumentNullException(nameof(values));
  39. return Create<TSource>(observer => source.SubscribeSafeAsync(AsyncObserver.Append(observer, values)));
  40. }
  41. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, IAsyncScheduler scheduler, params TSource[] values)
  42. {
  43. if (source == null)
  44. throw new ArgumentNullException(nameof(source));
  45. if (scheduler == null)
  46. throw new ArgumentNullException(nameof(scheduler));
  47. if (values == null)
  48. throw new ArgumentNullException(nameof(values));
  49. return Create<TSource>(async observer =>
  50. {
  51. var d = new CompositeAsyncDisposable();
  52. var (sink, disposable) = AsyncObserver.Append(observer, scheduler, values);
  53. await d.AddAsync(disposable).ConfigureAwait(false);
  54. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  55. await d.AddAsync(subscription).ConfigureAwait(false);
  56. return d;
  57. });
  58. }
  59. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, IEnumerable<TSource> values)
  60. {
  61. if (source == null)
  62. throw new ArgumentNullException(nameof(source));
  63. if (values == null)
  64. throw new ArgumentNullException(nameof(values));
  65. return Create<TSource>(observer => source.SubscribeSafeAsync(AsyncObserver.Append(observer, values)));
  66. }
  67. public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, IAsyncScheduler scheduler, IEnumerable<TSource> values)
  68. {
  69. if (source == null)
  70. throw new ArgumentNullException(nameof(source));
  71. if (scheduler == null)
  72. throw new ArgumentNullException(nameof(scheduler));
  73. if (values == null)
  74. throw new ArgumentNullException(nameof(values));
  75. return Create<TSource>(async observer =>
  76. {
  77. var d = new CompositeAsyncDisposable();
  78. var (sink, disposable) = AsyncObserver.Append(observer, scheduler, values);
  79. await d.AddAsync(disposable).ConfigureAwait(false);
  80. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  81. await d.AddAsync(subscription).ConfigureAwait(false);
  82. return d;
  83. });
  84. }
  85. }
  86. partial class AsyncObserver
  87. {
  88. public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, TSource value)
  89. {
  90. if (observer == null)
  91. throw new ArgumentNullException(nameof(observer));
  92. return Create<TSource>(
  93. observer.OnNextAsync,
  94. observer.OnErrorAsync,
  95. async () =>
  96. {
  97. await observer.OnNextAsync(value).ConfigureAwait(false);
  98. await observer.OnCompletedAsync().ConfigureAwait(false);
  99. }
  100. );
  101. }
  102. public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsyncObserver<TSource> observer, TSource value, IAsyncScheduler scheduler)
  103. {
  104. if (observer == null)
  105. throw new ArgumentNullException(nameof(observer));
  106. if (scheduler == null)
  107. throw new ArgumentNullException(nameof(scheduler));
  108. var d = new SingleAssignmentAsyncDisposable();
  109. return
  110. (
  111. Create<TSource>(
  112. observer.OnNextAsync,
  113. observer.OnErrorAsync,
  114. async () =>
  115. {
  116. var task = await scheduler.ScheduleAsync(async ct =>
  117. {
  118. if (!ct.IsCancellationRequested)
  119. {
  120. await observer.OnNextAsync(value).RendezVous(scheduler, ct);
  121. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  122. }
  123. }).ConfigureAwait(false);
  124. await d.AssignAsync(task).ConfigureAwait(false);
  125. }
  126. ),
  127. d
  128. );
  129. }
  130. public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, params TSource[] values)
  131. {
  132. if (observer == null)
  133. throw new ArgumentNullException(nameof(observer));
  134. if (values == null)
  135. throw new ArgumentNullException(nameof(values));
  136. return Create<TSource>(
  137. observer.OnNextAsync,
  138. observer.OnErrorAsync,
  139. async () =>
  140. {
  141. foreach (var value in values)
  142. {
  143. await observer.OnNextAsync(value).ConfigureAwait(false);
  144. }
  145. await observer.OnCompletedAsync().ConfigureAwait(false);
  146. }
  147. );
  148. }
  149. public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, params TSource[] values)
  150. {
  151. if (observer == null)
  152. throw new ArgumentNullException(nameof(observer));
  153. if (values == null)
  154. throw new ArgumentNullException(nameof(values));
  155. if (scheduler == null)
  156. throw new ArgumentNullException(nameof(scheduler));
  157. throw new NotImplementedException();
  158. }
  159. public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, IEnumerable<TSource> values)
  160. {
  161. if (observer == null)
  162. throw new ArgumentNullException(nameof(observer));
  163. if (values == null)
  164. throw new ArgumentNullException(nameof(values));
  165. return Create<TSource>(
  166. observer.OnNextAsync,
  167. observer.OnErrorAsync,
  168. async () =>
  169. {
  170. foreach (var value in values)
  171. {
  172. await observer.OnNextAsync(value).ConfigureAwait(false);
  173. }
  174. await observer.OnCompletedAsync().ConfigureAwait(false);
  175. }
  176. );
  177. }
  178. public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, IEnumerable<TSource> values)
  179. {
  180. if (observer == null)
  181. throw new ArgumentNullException(nameof(observer));
  182. if (values == null)
  183. throw new ArgumentNullException(nameof(values));
  184. if (scheduler == null)
  185. throw new ArgumentNullException(nameof(scheduler));
  186. throw new NotImplementedException();
  187. }
  188. }
  189. }