SelectMany.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. public partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, IAsyncObservable<TResult>> selector)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. if (selector == null)
  16. throw new ArgumentNullException(nameof(selector));
  17. return CreateAsyncObservable<TResult>.From(
  18. source,
  19. selector,
  20. static async (source, selector, observer) =>
  21. {
  22. var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
  23. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  24. return StableCompositeAsyncDisposable.Create(subscription, inner);
  25. });
  26. }
  27. public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<IAsyncObservable<TResult>>> selector)
  28. {
  29. if (source == null)
  30. throw new ArgumentNullException(nameof(source));
  31. if (selector == null)
  32. throw new ArgumentNullException(nameof(selector));
  33. return CreateAsyncObservable<TResult>.From(
  34. source,
  35. selector,
  36. static async (source, selector, observer) =>
  37. {
  38. var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
  39. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  40. return StableCompositeAsyncDisposable.Create(subscription, inner);
  41. });
  42. }
  43. public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  44. {
  45. if (source == null)
  46. throw new ArgumentNullException(nameof(source));
  47. if (collectionSelector == null)
  48. throw new ArgumentNullException(nameof(collectionSelector));
  49. if (resultSelector == null)
  50. throw new ArgumentNullException(nameof(resultSelector));
  51. return CreateAsyncObservable<TResult>.From(
  52. source,
  53. (collectionSelector, resultSelector),
  54. static async (source, state, observer) =>
  55. {
  56. var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector);
  57. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  58. return StableCompositeAsyncDisposable.Create(subscription, inner);
  59. });
  60. }
  61. public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  62. {
  63. if (source == null)
  64. throw new ArgumentNullException(nameof(source));
  65. if (collectionSelector == null)
  66. throw new ArgumentNullException(nameof(collectionSelector));
  67. if (resultSelector == null)
  68. throw new ArgumentNullException(nameof(resultSelector));
  69. return CreateAsyncObservable<TResult>.From(
  70. source,
  71. (collectionSelector, resultSelector),
  72. static async (source, state, observer) =>
  73. {
  74. var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector);
  75. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  76. return StableCompositeAsyncDisposable.Create(subscription, inner);
  77. });
  78. }
  79. public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, IAsyncObservable<TResult>> selector)
  80. {
  81. if (source == null)
  82. throw new ArgumentNullException(nameof(source));
  83. if (selector == null)
  84. throw new ArgumentNullException(nameof(selector));
  85. return CreateAsyncObservable<TResult>.From(
  86. source,
  87. selector,
  88. static async (source, selector, observer) =>
  89. {
  90. var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
  91. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  92. return StableCompositeAsyncDisposable.Create(subscription, inner);
  93. });
  94. }
  95. public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, ValueTask<IAsyncObservable<TResult>>> selector)
  96. {
  97. if (source == null)
  98. throw new ArgumentNullException(nameof(source));
  99. if (selector == null)
  100. throw new ArgumentNullException(nameof(selector));
  101. return CreateAsyncObservable<TResult>.From(
  102. source,
  103. selector,
  104. static async (source, selector, observer) =>
  105. {
  106. var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
  107. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  108. return StableCompositeAsyncDisposable.Create(subscription, inner);
  109. });
  110. }
  111. public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  112. {
  113. if (source == null)
  114. throw new ArgumentNullException(nameof(source));
  115. if (collectionSelector == null)
  116. throw new ArgumentNullException(nameof(collectionSelector));
  117. if (resultSelector == null)
  118. throw new ArgumentNullException(nameof(resultSelector));
  119. return CreateAsyncObservable<TResult>.From(
  120. source,
  121. (collectionSelector, resultSelector),
  122. static async (source, state, observer) =>
  123. {
  124. var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector);
  125. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  126. return StableCompositeAsyncDisposable.Create(subscription, inner);
  127. });
  128. }
  129. public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, ValueTask<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, int, TCollection, int, ValueTask<TResult>> resultSelector)
  130. {
  131. if (source == null)
  132. throw new ArgumentNullException(nameof(source));
  133. if (collectionSelector == null)
  134. throw new ArgumentNullException(nameof(collectionSelector));
  135. if (resultSelector == null)
  136. throw new ArgumentNullException(nameof(resultSelector));
  137. return CreateAsyncObservable<TResult>.From(
  138. source,
  139. (collectionSelector, resultSelector),
  140. static async (source, state, observer) =>
  141. {
  142. var (sink, inner) = AsyncObserver.SelectMany(observer, state.collectionSelector, state.resultSelector);
  143. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  144. return StableCompositeAsyncDisposable.Create(subscription, inner);
  145. });
  146. }
  147. }
  148. public partial class AsyncObserver
  149. {
  150. public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, IAsyncObservable<TResult>> selector)
  151. {
  152. if (observer == null)
  153. throw new ArgumentNullException(nameof(observer));
  154. if (selector == null)
  155. throw new ArgumentNullException(nameof(selector));
  156. return SelectMany<TSource, TResult, TResult>(observer, x => new ValueTask<IAsyncObservable<TResult>>(selector(x)), (x, y) => new ValueTask<TResult>(y));
  157. }
  158. public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, ValueTask<IAsyncObservable<TResult>>> selector)
  159. {
  160. if (observer == null)
  161. throw new ArgumentNullException(nameof(observer));
  162. if (selector == null)
  163. throw new ArgumentNullException(nameof(selector));
  164. return SelectMany<TSource, TResult, TResult>(observer, selector, (x, y) => new ValueTask<TResult>(y));
  165. }
  166. public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  167. {
  168. if (observer == null)
  169. throw new ArgumentNullException(nameof(observer));
  170. if (collectionSelector == null)
  171. throw new ArgumentNullException(nameof(collectionSelector));
  172. if (resultSelector == null)
  173. throw new ArgumentNullException(nameof(resultSelector));
  174. return SelectMany<TSource, TCollection, TResult>(observer, x => new ValueTask<IAsyncObservable<TCollection>>(collectionSelector(x)), (x, y) => new ValueTask<TResult>(resultSelector(x, y)));
  175. }
  176. public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, ValueTask<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector)
  177. {
  178. if (observer == null)
  179. throw new ArgumentNullException(nameof(observer));
  180. if (collectionSelector == null)
  181. throw new ArgumentNullException(nameof(collectionSelector));
  182. if (resultSelector == null)
  183. throw new ArgumentNullException(nameof(resultSelector));
  184. var gate = new AsyncGate();
  185. var count = 1;
  186. var disposable = new CompositeAsyncDisposable();
  187. async ValueTask OnErrorAsync(Exception ex)
  188. {
  189. using (await gate.LockAsync().ConfigureAwait(false))
  190. {
  191. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  192. }
  193. };
  194. async ValueTask OnCompletedAsync()
  195. {
  196. using (await gate.LockAsync().ConfigureAwait(false))
  197. {
  198. if (--count == 0)
  199. {
  200. await observer.OnCompletedAsync().ConfigureAwait(false);
  201. }
  202. }
  203. };
  204. return
  205. (
  206. Create<TSource>(
  207. async x =>
  208. {
  209. var collection = default(IAsyncObservable<TCollection>);
  210. try
  211. {
  212. collection = await collectionSelector(x).ConfigureAwait(false);
  213. }
  214. catch (Exception ex)
  215. {
  216. await OnErrorAsync(ex).ConfigureAwait(false);
  217. return;
  218. }
  219. using (await gate.LockAsync().ConfigureAwait(false))
  220. {
  221. count++;
  222. }
  223. var inner = new SingleAssignmentAsyncDisposable();
  224. await disposable.AddAsync(inner).ConfigureAwait(false);
  225. var innerObserver = Create<TCollection>(
  226. async y =>
  227. {
  228. var res = default(TResult);
  229. try
  230. {
  231. res = await resultSelector(x, y).ConfigureAwait(false);
  232. }
  233. catch (Exception ex)
  234. {
  235. await OnErrorAsync(ex).ConfigureAwait(false);
  236. return;
  237. }
  238. using (await gate.LockAsync().ConfigureAwait(false))
  239. {
  240. await observer.OnNextAsync(res).ConfigureAwait(false);
  241. }
  242. },
  243. OnErrorAsync,
  244. async () =>
  245. {
  246. await OnCompletedAsync().ConfigureAwait(false);
  247. await disposable.RemoveAsync(inner).ConfigureAwait(false);
  248. }
  249. );
  250. var innerSubscription = await collection.SubscribeSafeAsync(innerObserver).ConfigureAwait(false);
  251. await inner.AssignAsync(innerSubscription).ConfigureAwait(false);
  252. },
  253. OnErrorAsync,
  254. OnCompletedAsync
  255. ),
  256. disposable
  257. );
  258. }
  259. public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, IAsyncObservable<TResult>> selector)
  260. {
  261. if (observer == null)
  262. throw new ArgumentNullException(nameof(observer));
  263. if (selector == null)
  264. throw new ArgumentNullException(nameof(selector));
  265. return SelectMany<TSource, TResult, TResult>(observer, (x, i) => new ValueTask<IAsyncObservable<TResult>>(selector(x, i)), (x, i, y, j) => new ValueTask<TResult>(y));
  266. }
  267. public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, ValueTask<IAsyncObservable<TResult>>> selector)
  268. {
  269. if (observer == null)
  270. throw new ArgumentNullException(nameof(observer));
  271. if (selector == null)
  272. throw new ArgumentNullException(nameof(selector));
  273. return SelectMany<TSource, TResult, TResult>(observer, selector, (x, i, y, j) => new ValueTask<TResult>(y));
  274. }
  275. public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  276. {
  277. if (observer == null)
  278. throw new ArgumentNullException(nameof(observer));
  279. if (collectionSelector == null)
  280. throw new ArgumentNullException(nameof(collectionSelector));
  281. if (resultSelector == null)
  282. throw new ArgumentNullException(nameof(resultSelector));
  283. return SelectMany<TSource, TCollection, TResult>(observer, (x, i) => new ValueTask<IAsyncObservable<TCollection>>(collectionSelector(x, i)), (x, i, y, j) => new ValueTask<TResult>(resultSelector(x, i, y, j)));
  284. }
  285. public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, ValueTask<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, int, TCollection, int, ValueTask<TResult>> resultSelector)
  286. {
  287. if (observer == null)
  288. throw new ArgumentNullException(nameof(observer));
  289. if (collectionSelector == null)
  290. throw new ArgumentNullException(nameof(collectionSelector));
  291. if (resultSelector == null)
  292. throw new ArgumentNullException(nameof(resultSelector));
  293. async ValueTask<IAsyncObservable<(TCollection item, int i)>> collectionSelectorWithIndex((TSource item, int i) t) => (await collectionSelector(t.item, t.i).ConfigureAwait(false)).Select((item, i) => (item, i));
  294. ValueTask<TResult> resultSelectorWithIndex((TSource item, int i) outer, (TCollection item, int i) inner) => resultSelector(outer.item, outer.i, inner.item, inner.i);
  295. var (outerObserverWithIndex, disposable) = SelectMany(observer, collectionSelectorWithIndex, (Func<(TSource item, int i), (TCollection item, int i), ValueTask<TResult>>)resultSelectorWithIndex);
  296. var outerObserver = Select<TSource, (TSource item, int i)>(outerObserverWithIndex, (item, i) => (item, i));
  297. return (outerObserver, disposable);
  298. }
  299. }
  300. }