GroupJoin.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Reactive.Linq
  10. {
  11. partial class AsyncObservable
  12. {
  13. public static IAsyncObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(this IAsyncObservable<TLeft> left, IAsyncObservable<TRight> right, Func<TLeft, IAsyncObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IAsyncObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IAsyncObservable<TRight>, TResult> resultSelector)
  14. {
  15. if (left == null)
  16. throw new ArgumentNullException(nameof(left));
  17. if (right == null)
  18. throw new ArgumentNullException(nameof(right));
  19. if (leftDurationSelector == null)
  20. throw new ArgumentNullException(nameof(leftDurationSelector));
  21. if (rightDurationSelector == null)
  22. throw new ArgumentNullException(nameof(rightDurationSelector));
  23. if (resultSelector == null)
  24. throw new ArgumentNullException(nameof(resultSelector));
  25. return Create<TResult>(async observer =>
  26. {
  27. var subscriptions = new CompositeAsyncDisposable();
  28. var (leftObserver, rightObserver, disposable) = AsyncObserver.GroupJoin(observer, subscriptions, leftDurationSelector, rightDurationSelector, resultSelector);
  29. var leftSubscription = await left.SubscribeSafeAsync(leftObserver).ConfigureAwait(false);
  30. await subscriptions.AddAsync(leftSubscription).ConfigureAwait(false);
  31. var rightSubscription = await right.SubscribeSafeAsync(rightObserver).ConfigureAwait(false);
  32. await subscriptions.AddAsync(rightSubscription).ConfigureAwait(false);
  33. return disposable;
  34. });
  35. }
  36. }
  37. partial class AsyncObserver
  38. {
  39. public static (IAsyncObserver<TLeft>, IAsyncObserver<TRight>, IAsyncDisposable) GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IAsyncObserver<TResult> observer, IAsyncDisposable subscriptions, Func<TLeft, IAsyncObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IAsyncObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IAsyncObservable<TRight>, TResult> resultSelector)
  40. {
  41. if (observer == null)
  42. throw new ArgumentNullException(nameof(observer));
  43. if (subscriptions == null)
  44. throw new ArgumentNullException(nameof(subscriptions));
  45. if (leftDurationSelector == null)
  46. throw new ArgumentNullException(nameof(leftDurationSelector));
  47. if (rightDurationSelector == null)
  48. throw new ArgumentNullException(nameof(rightDurationSelector));
  49. if (resultSelector == null)
  50. throw new ArgumentNullException(nameof(resultSelector));
  51. var gate = new AsyncLock();
  52. var group = new CompositeAsyncDisposable(subscriptions);
  53. var refCount = new RefCountAsyncDisposable(group);
  54. var leftMap = new SortedDictionary<int, IAsyncObserver<TRight>>();
  55. var rightMap = new SortedDictionary<int, TRight>();
  56. var leftId = default(int);
  57. var rightId = default(int);
  58. async Task OnErrorAsync(Exception ex)
  59. {
  60. using (await gate.LockAsync().ConfigureAwait(false))
  61. {
  62. foreach (var o in leftMap)
  63. {
  64. await o.Value.OnErrorAsync(ex).ConfigureAwait(false);
  65. }
  66. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  67. }
  68. }
  69. var leftObserver =
  70. Create<TLeft>(
  71. async x =>
  72. {
  73. var s = new SequentialSimpleAsyncSubject<TRight>();
  74. var theLeftId = default(int);
  75. var theRightId = default(int);
  76. using (await gate.LockAsync().ConfigureAwait(false))
  77. {
  78. theLeftId = leftId++;
  79. theRightId = rightId;
  80. leftMap.Add(theLeftId, s);
  81. }
  82. var duration = default(IAsyncObservable<TLeftDuration>);
  83. try
  84. {
  85. duration = leftDurationSelector(x);
  86. }
  87. catch (Exception ex)
  88. {
  89. await OnErrorAsync(ex).ConfigureAwait(false);
  90. return;
  91. }
  92. var sad = new SingleAssignmentAsyncDisposable();
  93. await group.AddAsync(sad).ConfigureAwait(false);
  94. var durationObserver =
  95. Create<TLeftDuration>(
  96. d => Task.CompletedTask,
  97. OnErrorAsync,
  98. async () =>
  99. {
  100. using (await gate.LockAsync().ConfigureAwait(false))
  101. {
  102. if (leftMap.Remove(theLeftId))
  103. {
  104. await s.OnCompletedAsync().ConfigureAwait(false);
  105. }
  106. }
  107. await group.RemoveAsync(sad).ConfigureAwait(false);
  108. }
  109. );
  110. var durationSubscription = await duration.FirstOrDefault().SubscribeSafeAsync(durationObserver).ConfigureAwait(false);
  111. await sad.AssignAsync(durationSubscription).ConfigureAwait(false);
  112. var window = new WindowAsyncObservable<TRight>(s, refCount);
  113. var result = default(TResult);
  114. try
  115. {
  116. result = resultSelector(x, window);
  117. }
  118. catch (Exception ex)
  119. {
  120. await OnErrorAsync(ex).ConfigureAwait(false);
  121. return;
  122. }
  123. using (await gate.LockAsync().ConfigureAwait(false))
  124. {
  125. await observer.OnNextAsync(result).ConfigureAwait(false);
  126. foreach (var rightValue in rightMap)
  127. {
  128. if (rightValue.Key < theRightId)
  129. {
  130. await s.OnNextAsync(rightValue.Value).ConfigureAwait(false);
  131. }
  132. }
  133. }
  134. },
  135. OnErrorAsync,
  136. async () =>
  137. {
  138. using (await gate.LockAsync().ConfigureAwait(false))
  139. {
  140. await observer.OnCompletedAsync().ConfigureAwait(false);
  141. }
  142. }
  143. );
  144. var rightObserver =
  145. Create<TRight>(
  146. async x =>
  147. {
  148. var theLeftId = 0;
  149. var theRightId = 0;
  150. using (await gate.LockAsync().ConfigureAwait(false))
  151. {
  152. theRightId = rightId++;
  153. theLeftId = leftId;
  154. rightMap.Add(theRightId, x);
  155. }
  156. var duration = default(IAsyncObservable<TRightDuration>);
  157. try
  158. {
  159. duration = rightDurationSelector(x);
  160. }
  161. catch (Exception ex)
  162. {
  163. await OnErrorAsync(ex).ConfigureAwait(false);
  164. return;
  165. }
  166. var sad = new SingleAssignmentAsyncDisposable();
  167. await group.AddAsync(sad).ConfigureAwait(false);
  168. var durationObserver =
  169. Create<TRightDuration>(
  170. d => Task.CompletedTask,
  171. OnErrorAsync,
  172. async () =>
  173. {
  174. using (await gate.LockAsync().ConfigureAwait(false))
  175. {
  176. rightMap.Remove(theRightId);
  177. }
  178. await group.RemoveAsync(sad).ConfigureAwait(false);
  179. }
  180. );
  181. var durationSubscription = await duration.FirstOrDefault().SubscribeSafeAsync(durationObserver).ConfigureAwait(false);
  182. await sad.AssignAsync(durationSubscription).ConfigureAwait(false);
  183. using (await gate.LockAsync().ConfigureAwait(false))
  184. {
  185. foreach (var o in leftMap)
  186. {
  187. if (o.Key < theLeftId)
  188. {
  189. await o.Value.OnNextAsync(x).ConfigureAwait(false);
  190. }
  191. }
  192. }
  193. },
  194. OnErrorAsync,
  195. () => Task.CompletedTask
  196. );
  197. return (leftObserver, rightObserver, refCount);
  198. }
  199. }
  200. }