WithLatestFrom.cs 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Reactive.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. public partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<(TFirst first, TSecond second)> WithLatestFrom<TFirst, TSecond>(this IAsyncObservable<TFirst> first, IAsyncObservable<TSecond> second)
  12. {
  13. if (first == null)
  14. throw new ArgumentNullException(nameof(first));
  15. if (second == null)
  16. throw new ArgumentNullException(nameof(second));
  17. return CreateAsyncObservable<(TFirst first, TSecond second)>.From(
  18. first,
  19. second,
  20. static async (first, second, observer) =>
  21. {
  22. var (firstObserver, secondObserver) = AsyncObserver.WithLatestFrom(observer);
  23. // REVIEW: Consider concurrent subscriptions.
  24. var firstSubscription = await first.SubscribeSafeAsync(firstObserver).ConfigureAwait(false);
  25. var secondSubscription = await second.SubscribeSafeAsync(secondObserver).ConfigureAwait(false);
  26. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  27. });
  28. }
  29. public static IAsyncObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IAsyncObservable<TFirst> first, IAsyncObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  30. {
  31. if (first == null)
  32. throw new ArgumentNullException(nameof(first));
  33. if (second == null)
  34. throw new ArgumentNullException(nameof(second));
  35. if (resultSelector == null)
  36. throw new ArgumentNullException(nameof(resultSelector));
  37. return CreateAsyncObservable<TResult>.From(
  38. first,
  39. (second, resultSelector),
  40. static async (first, state, observer) =>
  41. {
  42. var (firstObserver, secondObserver) = AsyncObserver.WithLatestFrom(observer, state.resultSelector);
  43. // REVIEW: Consider concurrent subscriptions.
  44. var firstSubscription = await first.SubscribeSafeAsync(firstObserver).ConfigureAwait(false);
  45. var secondSubscription = await state.second.SubscribeSafeAsync(secondObserver).ConfigureAwait(false);
  46. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  47. });
  48. }
  49. public static IAsyncObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IAsyncObservable<TFirst> first, IAsyncObservable<TSecond> second, Func<TFirst, TSecond, ValueTask<TResult>> resultSelector)
  50. {
  51. if (first == null)
  52. throw new ArgumentNullException(nameof(first));
  53. if (second == null)
  54. throw new ArgumentNullException(nameof(second));
  55. if (resultSelector == null)
  56. throw new ArgumentNullException(nameof(resultSelector));
  57. return CreateAsyncObservable<TResult>.From(
  58. first,
  59. (second, resultSelector),
  60. static async (first, state, observer) =>
  61. {
  62. var (firstObserver, secondObserver) = AsyncObserver.WithLatestFrom(observer, state.resultSelector);
  63. // REVIEW: Consider concurrent subscriptions.
  64. var firstSubscription = await first.SubscribeSafeAsync(firstObserver).ConfigureAwait(false);
  65. var secondSubscription = await state.second.SubscribeSafeAsync(secondObserver).ConfigureAwait(false);
  66. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  67. });
  68. }
  69. }
  70. public partial class AsyncObserver
  71. {
  72. public static (IAsyncObserver<TFirst>, IAsyncObserver<TSecond>) WithLatestFrom<TFirst, TSecond, TResult>(IAsyncObserver<TResult> observer, Func<TFirst, TSecond, TResult> resultSelector)
  73. {
  74. if (observer == null)
  75. throw new ArgumentNullException(nameof(observer));
  76. if (resultSelector == null)
  77. throw new ArgumentNullException(nameof(resultSelector));
  78. return WithLatestFrom<TFirst, TSecond, TResult>(observer, (x, y) => new ValueTask<TResult>(resultSelector(x, y)));
  79. }
  80. public static (IAsyncObserver<TFirst>, IAsyncObserver<TSecond>) WithLatestFrom<TFirst, TSecond, TResult>(IAsyncObserver<TResult> observer, Func<TFirst, TSecond, ValueTask<TResult>> resultSelector)
  81. {
  82. if (observer == null)
  83. throw new ArgumentNullException(nameof(observer));
  84. if (resultSelector == null)
  85. throw new ArgumentNullException(nameof(resultSelector));
  86. var gate = AsyncGate.Create();
  87. async ValueTask OnErrorAsync(Exception ex)
  88. {
  89. using (await gate.LockAsync().ConfigureAwait(false))
  90. {
  91. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  92. }
  93. }
  94. var hasLatest = false;
  95. var latest = default(TSecond);
  96. return
  97. (
  98. Create<TFirst>(
  99. async x =>
  100. {
  101. using (await gate.LockAsync().ConfigureAwait(false))
  102. {
  103. if (hasLatest)
  104. {
  105. var res = default(TResult);
  106. try
  107. {
  108. res = await resultSelector(x, latest).ConfigureAwait(false);
  109. }
  110. catch (Exception ex)
  111. {
  112. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  113. return;
  114. }
  115. await observer.OnNextAsync(res).ConfigureAwait(false);
  116. }
  117. }
  118. },
  119. OnErrorAsync,
  120. async () =>
  121. {
  122. using (await gate.LockAsync().ConfigureAwait(false))
  123. {
  124. await observer.OnCompletedAsync().ConfigureAwait(false);
  125. }
  126. }
  127. ),
  128. Create<TSecond>(
  129. async y =>
  130. {
  131. using (await gate.LockAsync().ConfigureAwait(false))
  132. {
  133. hasLatest = true;
  134. latest = y;
  135. }
  136. },
  137. OnErrorAsync,
  138. () => default
  139. )
  140. );
  141. }
  142. public static (IAsyncObserver<TFirst>, IAsyncObserver<TSecond>) WithLatestFrom<TFirst, TSecond>(IAsyncObserver<(TFirst first, TSecond second)> observer)
  143. {
  144. if (observer == null)
  145. throw new ArgumentNullException(nameof(observer));
  146. var gate = AsyncGate.Create();
  147. async ValueTask OnErrorAsync(Exception ex)
  148. {
  149. using (await gate.LockAsync().ConfigureAwait(false))
  150. {
  151. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  152. }
  153. }
  154. var hasLatest = false;
  155. var latest = default(TSecond);
  156. return
  157. (
  158. Create<TFirst>(
  159. async x =>
  160. {
  161. using (await gate.LockAsync().ConfigureAwait(false))
  162. {
  163. if (hasLatest)
  164. {
  165. await observer.OnNextAsync((first: x, second: latest)).ConfigureAwait(false);
  166. }
  167. }
  168. },
  169. OnErrorAsync,
  170. async () =>
  171. {
  172. using (await gate.LockAsync().ConfigureAwait(false))
  173. {
  174. await observer.OnCompletedAsync().ConfigureAwait(false);
  175. }
  176. }
  177. ),
  178. Create<TSecond>(
  179. async y =>
  180. {
  181. using (await gate.LockAsync().ConfigureAwait(false))
  182. {
  183. hasLatest = true;
  184. latest = y;
  185. }
  186. },
  187. OnErrorAsync,
  188. () => default
  189. )
  190. );
  191. }
  192. }
  193. }