WithLatestFrom.cs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<(TFirst first, TSecond second)> WithLatestFrom<TFirst, TSecond>(this IAsyncObservable<TFirst> first, IAsyncObservable<TSecond> second)
  12. {
  13. if (first == null)
  14. throw new ArgumentNullException(nameof(first));
  15. if (second == null)
  16. throw new ArgumentNullException(nameof(second));
  17. return Create<(TFirst first, TSecond second)>(async observer =>
  18. {
  19. var (firstObserver, secondObserver) = AsyncObserver.WithLatestFrom(observer);
  20. // REVIEW: Consider concurrent subscriptions.
  21. var firstSubscription = await first.SubscribeSafeAsync(firstObserver).ConfigureAwait(false);
  22. var secondSubscription = await second.SubscribeSafeAsync(secondObserver).ConfigureAwait(false);
  23. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  24. });
  25. }
  26. public static IAsyncObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IAsyncObservable<TFirst> first, IAsyncObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  27. {
  28. if (first == null)
  29. throw new ArgumentNullException(nameof(first));
  30. if (second == null)
  31. throw new ArgumentNullException(nameof(second));
  32. if (resultSelector == null)
  33. throw new ArgumentNullException(nameof(resultSelector));
  34. return Create<TResult>(async observer =>
  35. {
  36. var (firstObserver, secondObserver) = AsyncObserver.WithLatestFrom(observer, resultSelector);
  37. // REVIEW: Consider concurrent subscriptions.
  38. var firstSubscription = await first.SubscribeSafeAsync(firstObserver).ConfigureAwait(false);
  39. var secondSubscription = await second.SubscribeSafeAsync(secondObserver).ConfigureAwait(false);
  40. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  41. });
  42. }
  43. public static IAsyncObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IAsyncObservable<TFirst> first, IAsyncObservable<TSecond> second, Func<TFirst, TSecond, Task<TResult>> resultSelector)
  44. {
  45. if (first == null)
  46. throw new ArgumentNullException(nameof(first));
  47. if (second == null)
  48. throw new ArgumentNullException(nameof(second));
  49. if (resultSelector == null)
  50. throw new ArgumentNullException(nameof(resultSelector));
  51. return Create<TResult>(async observer =>
  52. {
  53. var (firstObserver, secondObserver) = AsyncObserver.WithLatestFrom(observer, resultSelector);
  54. // REVIEW: Consider concurrent subscriptions.
  55. var firstSubscription = await first.SubscribeSafeAsync(firstObserver).ConfigureAwait(false);
  56. var secondSubscription = await second.SubscribeSafeAsync(secondObserver).ConfigureAwait(false);
  57. return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
  58. });
  59. }
  60. }
  61. partial class AsyncObserver
  62. {
  63. public static (IAsyncObserver<TFirst>, IAsyncObserver<TSecond>) WithLatestFrom<TFirst, TSecond, TResult>(IAsyncObserver<TResult> observer, Func<TFirst, TSecond, TResult> resultSelector)
  64. {
  65. if (observer == null)
  66. throw new ArgumentNullException(nameof(observer));
  67. if (resultSelector == null)
  68. throw new ArgumentNullException(nameof(resultSelector));
  69. return WithLatestFrom<TFirst, TSecond, TResult>(observer, (x, y) => Task.FromResult(resultSelector(x, y)));
  70. }
  71. public static (IAsyncObserver<TFirst>, IAsyncObserver<TSecond>) WithLatestFrom<TFirst, TSecond, TResult>(IAsyncObserver<TResult> observer, Func<TFirst, TSecond, Task<TResult>> resultSelector)
  72. {
  73. if (observer == null)
  74. throw new ArgumentNullException(nameof(observer));
  75. if (resultSelector == null)
  76. throw new ArgumentNullException(nameof(resultSelector));
  77. var gate = new AsyncLock();
  78. async Task OnErrorAsync(Exception ex)
  79. {
  80. using (await gate.LockAsync().ConfigureAwait(false))
  81. {
  82. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  83. }
  84. }
  85. var hasLatest = false;
  86. var latest = default(TSecond);
  87. return
  88. (
  89. Create<TFirst>(
  90. async x =>
  91. {
  92. using (await gate.LockAsync().ConfigureAwait(false))
  93. {
  94. if (hasLatest)
  95. {
  96. var res = default(TResult);
  97. try
  98. {
  99. res = await resultSelector(x, latest).ConfigureAwait(false);
  100. }
  101. catch (Exception ex)
  102. {
  103. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  104. return;
  105. }
  106. await observer.OnNextAsync(res).ConfigureAwait(false);
  107. }
  108. }
  109. },
  110. OnErrorAsync,
  111. async () =>
  112. {
  113. using (await gate.LockAsync().ConfigureAwait(false))
  114. {
  115. await observer.OnCompletedAsync().ConfigureAwait(false);
  116. }
  117. }
  118. ),
  119. Create<TSecond>(
  120. async y =>
  121. {
  122. using (await gate.LockAsync().ConfigureAwait(false))
  123. {
  124. hasLatest = true;
  125. latest = y;
  126. }
  127. },
  128. OnErrorAsync,
  129. () => Task.CompletedTask
  130. )
  131. );
  132. }
  133. public static (IAsyncObserver<TFirst>, IAsyncObserver<TSecond>) WithLatestFrom<TFirst, TSecond>(IAsyncObserver<(TFirst first, TSecond second)> observer)
  134. {
  135. if (observer == null)
  136. throw new ArgumentNullException(nameof(observer));
  137. var gate = new AsyncLock();
  138. async Task OnErrorAsync(Exception ex)
  139. {
  140. using (await gate.LockAsync().ConfigureAwait(false))
  141. {
  142. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  143. }
  144. }
  145. var hasLatest = false;
  146. var latest = default(TSecond);
  147. return
  148. (
  149. Create<TFirst>(
  150. async x =>
  151. {
  152. using (await gate.LockAsync().ConfigureAwait(false))
  153. {
  154. if (hasLatest)
  155. {
  156. await observer.OnNextAsync((first: x, second: latest)).ConfigureAwait(false);
  157. }
  158. }
  159. },
  160. OnErrorAsync,
  161. async () =>
  162. {
  163. using (await gate.LockAsync().ConfigureAwait(false))
  164. {
  165. await observer.OnCompletedAsync().ConfigureAwait(false);
  166. }
  167. }
  168. ),
  169. Create<TSecond>(
  170. async y =>
  171. {
  172. using (await gate.LockAsync().ConfigureAwait(false))
  173. {
  174. hasLatest = true;
  175. latest = y;
  176. }
  177. },
  178. OnErrorAsync,
  179. () => Task.CompletedTask
  180. )
  181. );
  182. }
  183. }
  184. }