WithLatestFrom.cs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. public partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<(TFirst first, TSecond second)> WithLatestFrom<TFirst, TSecond>(this IAsyncObservable<TFirst> first, IAsyncObservable<TSecond> second)
  12. {
  13. if (first == null)
  14. throw new ArgumentNullException(nameof(first));
  15. if (second == null)
  16. throw new ArgumentNullException(nameof(second));
  17. return Create(
  18. first,
  19. second,
  20. default((TFirst first, TSecond second)),
  21. async (first, second, observer) =>
  22. {
  23. var (firstObserver, secondObserver) = AsyncObserver.WithLatestFrom(observer);
  24. // REVIEW: Consider concurrent subscriptions.
  25. var firstSubscription = await first.SubscribeSafeAsync(firstObserver).ConfigureAwait(false);
  26. var secondSubscription = await second.SubscribeSafeAsync(secondObserver).ConfigureAwait(false);
  27. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  28. });
  29. }
  30. public static IAsyncObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IAsyncObservable<TFirst> first, IAsyncObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  31. {
  32. if (first == null)
  33. throw new ArgumentNullException(nameof(first));
  34. if (second == null)
  35. throw new ArgumentNullException(nameof(second));
  36. if (resultSelector == null)
  37. throw new ArgumentNullException(nameof(resultSelector));
  38. return Create(
  39. first,
  40. (second, resultSelector),
  41. default(TResult),
  42. async (first, state, observer) =>
  43. {
  44. var (firstObserver, secondObserver) = AsyncObserver.WithLatestFrom(observer, state.resultSelector);
  45. // REVIEW: Consider concurrent subscriptions.
  46. var firstSubscription = await first.SubscribeSafeAsync(firstObserver).ConfigureAwait(false);
  47. var secondSubscription = await state.second.SubscribeSafeAsync(secondObserver).ConfigureAwait(false);
  48. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  49. });
  50. }
  51. public static IAsyncObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IAsyncObservable<TFirst> first, IAsyncObservable<TSecond> second, Func<TFirst, TSecond, ValueTask<TResult>> resultSelector)
  52. {
  53. if (first == null)
  54. throw new ArgumentNullException(nameof(first));
  55. if (second == null)
  56. throw new ArgumentNullException(nameof(second));
  57. if (resultSelector == null)
  58. throw new ArgumentNullException(nameof(resultSelector));
  59. return Create(
  60. first,
  61. (second, resultSelector),
  62. default(TResult),
  63. async (first, state, observer) =>
  64. {
  65. var (firstObserver, secondObserver) = AsyncObserver.WithLatestFrom(observer, state.resultSelector);
  66. // REVIEW: Consider concurrent subscriptions.
  67. var firstSubscription = await first.SubscribeSafeAsync(firstObserver).ConfigureAwait(false);
  68. var secondSubscription = await state.second.SubscribeSafeAsync(secondObserver).ConfigureAwait(false);
  69. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  70. });
  71. }
  72. }
  73. public partial class AsyncObserver
  74. {
  75. public static (IAsyncObserver<TFirst>, IAsyncObserver<TSecond>) WithLatestFrom<TFirst, TSecond, TResult>(IAsyncObserver<TResult> observer, Func<TFirst, TSecond, TResult> resultSelector)
  76. {
  77. if (observer == null)
  78. throw new ArgumentNullException(nameof(observer));
  79. if (resultSelector == null)
  80. throw new ArgumentNullException(nameof(resultSelector));
  81. return WithLatestFrom<TFirst, TSecond, TResult>(observer, (x, y) => new ValueTask<TResult>(resultSelector(x, y)));
  82. }
  83. public static (IAsyncObserver<TFirst>, IAsyncObserver<TSecond>) WithLatestFrom<TFirst, TSecond, TResult>(IAsyncObserver<TResult> observer, Func<TFirst, TSecond, ValueTask<TResult>> resultSelector)
  84. {
  85. if (observer == null)
  86. throw new ArgumentNullException(nameof(observer));
  87. if (resultSelector == null)
  88. throw new ArgumentNullException(nameof(resultSelector));
  89. var gate = new AsyncLock();
  90. async ValueTask OnErrorAsync(Exception ex)
  91. {
  92. using (await gate.LockAsync().ConfigureAwait(false))
  93. {
  94. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  95. }
  96. }
  97. var hasLatest = false;
  98. var latest = default(TSecond);
  99. return
  100. (
  101. Create<TFirst>(
  102. async x =>
  103. {
  104. using (await gate.LockAsync().ConfigureAwait(false))
  105. {
  106. if (hasLatest)
  107. {
  108. var res = default(TResult);
  109. try
  110. {
  111. res = await resultSelector(x, latest).ConfigureAwait(false);
  112. }
  113. catch (Exception ex)
  114. {
  115. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  116. return;
  117. }
  118. await observer.OnNextAsync(res).ConfigureAwait(false);
  119. }
  120. }
  121. },
  122. OnErrorAsync,
  123. async () =>
  124. {
  125. using (await gate.LockAsync().ConfigureAwait(false))
  126. {
  127. await observer.OnCompletedAsync().ConfigureAwait(false);
  128. }
  129. }
  130. ),
  131. Create<TSecond>(
  132. async y =>
  133. {
  134. using (await gate.LockAsync().ConfigureAwait(false))
  135. {
  136. hasLatest = true;
  137. latest = y;
  138. }
  139. },
  140. OnErrorAsync,
  141. () => default
  142. )
  143. );
  144. }
  145. public static (IAsyncObserver<TFirst>, IAsyncObserver<TSecond>) WithLatestFrom<TFirst, TSecond>(IAsyncObserver<(TFirst first, TSecond second)> observer)
  146. {
  147. if (observer == null)
  148. throw new ArgumentNullException(nameof(observer));
  149. var gate = new AsyncLock();
  150. async ValueTask OnErrorAsync(Exception ex)
  151. {
  152. using (await gate.LockAsync().ConfigureAwait(false))
  153. {
  154. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  155. }
  156. }
  157. var hasLatest = false;
  158. var latest = default(TSecond);
  159. return
  160. (
  161. Create<TFirst>(
  162. async x =>
  163. {
  164. using (await gate.LockAsync().ConfigureAwait(false))
  165. {
  166. if (hasLatest)
  167. {
  168. await observer.OnNextAsync((first: x, second: latest)).ConfigureAwait(false);
  169. }
  170. }
  171. },
  172. OnErrorAsync,
  173. async () =>
  174. {
  175. using (await gate.LockAsync().ConfigureAwait(false))
  176. {
  177. await observer.OnCompletedAsync().ConfigureAwait(false);
  178. }
  179. }
  180. ),
  181. Create<TSecond>(
  182. async y =>
  183. {
  184. using (await gate.LockAsync().ConfigureAwait(false))
  185. {
  186. hasLatest = true;
  187. latest = y;
  188. }
  189. },
  190. OnErrorAsync,
  191. () => default
  192. )
  193. );
  194. }
  195. }
  196. }