GroupJoin.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #nullable disable
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. internal sealed class GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult, GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>._>
  11. {
  12. private readonly IObservable<TLeft> _left;
  13. private readonly IObservable<TRight> _right;
  14. private readonly Func<TLeft, IObservable<TLeftDuration>> _leftDurationSelector;
  15. private readonly Func<TRight, IObservable<TRightDuration>> _rightDurationSelector;
  16. private readonly Func<TLeft, IObservable<TRight>, TResult> _resultSelector;
  17. public GroupJoin(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector)
  18. {
  19. _left = left;
  20. _right = right;
  21. _leftDurationSelector = leftDurationSelector;
  22. _rightDurationSelector = rightDurationSelector;
  23. _resultSelector = resultSelector;
  24. }
  25. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  26. protected override void Run(_ sink) => sink.Run(this);
  27. internal sealed class _ : IdentitySink<TResult>
  28. {
  29. private readonly object _gate = new object();
  30. private readonly CompositeDisposable _group = new CompositeDisposable();
  31. private readonly RefCountDisposable _refCount;
  32. private readonly SortedDictionary<int, IObserver<TRight>> _leftMap;
  33. private readonly SortedDictionary<int, TRight> _rightMap;
  34. private readonly Func<TLeft, IObservable<TLeftDuration>> _leftDurationSelector;
  35. private readonly Func<TRight, IObservable<TRightDuration>> _rightDurationSelector;
  36. private readonly Func<TLeft, IObservable<TRight>, TResult> _resultSelector;
  37. public _(GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent, IObserver<TResult> observer)
  38. : base(observer)
  39. {
  40. _refCount = new RefCountDisposable(_group);
  41. _leftMap = new SortedDictionary<int, IObserver<TRight>>();
  42. _rightMap = new SortedDictionary<int, TRight>();
  43. _leftDurationSelector = parent._leftDurationSelector;
  44. _rightDurationSelector = parent._rightDurationSelector;
  45. _resultSelector = parent._resultSelector;
  46. }
  47. private int _leftID;
  48. private int _rightID;
  49. public void Run(GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent)
  50. {
  51. var leftObserver = new LeftObserver(this);
  52. _group.Add(leftObserver);
  53. var rightObserver = new RightObserver(this);
  54. _group.Add(rightObserver);
  55. leftObserver.SetResource(parent._left.SubscribeSafe(leftObserver));
  56. rightObserver.SetResource(parent._right.SubscribeSafe(rightObserver));
  57. SetUpstream(_refCount);
  58. }
  59. private sealed class LeftObserver : SafeObserver<TLeft>
  60. {
  61. private readonly _ _parent;
  62. public LeftObserver(_ parent)
  63. {
  64. _parent = parent;
  65. }
  66. private void Expire(int id, IObserver<TRight> group, IDisposable resource)
  67. {
  68. lock (_parent._gate)
  69. {
  70. if (_parent._leftMap.Remove(id))
  71. {
  72. group.OnCompleted();
  73. }
  74. }
  75. _parent._group.Remove(resource);
  76. }
  77. public override void OnNext(TLeft value)
  78. {
  79. var s = new Subject<TRight>();
  80. int id;
  81. int rightID;
  82. lock (_parent._gate)
  83. {
  84. id = _parent._leftID++;
  85. rightID = _parent._rightID;
  86. _parent._leftMap.Add(id, s);
  87. }
  88. var window = new WindowObservable<TRight>(s, _parent._refCount);
  89. // BREAKING CHANGE v2 > v1.x - Order of evaluation or the _leftDurationSelector and _resultSelector now consistent with Join.
  90. IObservable<TLeftDuration> duration;
  91. try
  92. {
  93. duration = _parent._leftDurationSelector(value);
  94. }
  95. catch (Exception exception)
  96. {
  97. OnError(exception);
  98. return;
  99. }
  100. var durationObserver = new DurationObserver(this, id, s);
  101. _parent._group.Add(durationObserver);
  102. // BREAKING CHANGE v2 > v1.x - The duration sequence is subscribed to before the result sequence is evaluated.
  103. durationObserver.SetResource(duration.SubscribeSafe(durationObserver));
  104. TResult result;
  105. try
  106. {
  107. result = _parent._resultSelector(value, window);
  108. }
  109. catch (Exception exception)
  110. {
  111. OnError(exception);
  112. return;
  113. }
  114. lock (_parent._gate)
  115. {
  116. _parent.ForwardOnNext(result);
  117. foreach (var rightValue in _parent._rightMap)
  118. {
  119. if (rightValue.Key < rightID)
  120. {
  121. s.OnNext(rightValue.Value);
  122. }
  123. }
  124. }
  125. }
  126. private sealed class DurationObserver : SafeObserver<TLeftDuration>
  127. {
  128. private readonly LeftObserver _parent;
  129. private readonly int _id;
  130. private readonly IObserver<TRight> _group;
  131. public DurationObserver(LeftObserver parent, int id, IObserver<TRight> group)
  132. {
  133. _parent = parent;
  134. _id = id;
  135. _group = group;
  136. }
  137. public override void OnNext(TLeftDuration value)
  138. {
  139. _parent.Expire(_id, _group, this);
  140. }
  141. public override void OnError(Exception error)
  142. {
  143. _parent.OnError(error);
  144. }
  145. public override void OnCompleted()
  146. {
  147. _parent.Expire(_id, _group, this);
  148. }
  149. }
  150. public override void OnError(Exception error)
  151. {
  152. lock (_parent._gate)
  153. {
  154. foreach (var o in _parent._leftMap)
  155. {
  156. o.Value.OnError(error);
  157. }
  158. _parent.ForwardOnError(error);
  159. }
  160. }
  161. public override void OnCompleted()
  162. {
  163. lock (_parent._gate)
  164. {
  165. _parent.ForwardOnCompleted();
  166. }
  167. Dispose();
  168. }
  169. }
  170. private sealed class RightObserver : SafeObserver<TRight>
  171. {
  172. private readonly _ _parent;
  173. public RightObserver(_ parent)
  174. {
  175. _parent = parent;
  176. }
  177. private void Expire(int id, IDisposable resource)
  178. {
  179. lock (_parent._gate)
  180. {
  181. _parent._rightMap.Remove(id);
  182. }
  183. _parent._group.Remove(resource);
  184. }
  185. public override void OnNext(TRight value)
  186. {
  187. int id;
  188. int leftID;
  189. lock (_parent._gate)
  190. {
  191. id = _parent._rightID++;
  192. leftID = _parent._leftID;
  193. _parent._rightMap.Add(id, value);
  194. }
  195. IObservable<TRightDuration> duration;
  196. try
  197. {
  198. duration = _parent._rightDurationSelector(value);
  199. }
  200. catch (Exception exception)
  201. {
  202. OnError(exception);
  203. return;
  204. }
  205. var durationObserver = new DurationObserver(this, id);
  206. _parent._group.Add(durationObserver);
  207. durationObserver.SetResource(duration.SubscribeSafe(durationObserver));
  208. lock (_parent._gate)
  209. {
  210. foreach (var o in _parent._leftMap)
  211. {
  212. if (o.Key < leftID)
  213. {
  214. o.Value.OnNext(value);
  215. }
  216. }
  217. }
  218. }
  219. private sealed class DurationObserver : SafeObserver<TRightDuration>
  220. {
  221. private readonly RightObserver _parent;
  222. private readonly int _id;
  223. public DurationObserver(RightObserver parent, int id)
  224. {
  225. _parent = parent;
  226. _id = id;
  227. }
  228. public override void OnNext(TRightDuration value)
  229. {
  230. _parent.Expire(_id, this);
  231. }
  232. public override void OnError(Exception error)
  233. {
  234. _parent.OnError(error);
  235. }
  236. public override void OnCompleted()
  237. {
  238. _parent.Expire(_id, this);
  239. }
  240. }
  241. public override void OnError(Exception error)
  242. {
  243. lock (_parent._gate)
  244. {
  245. foreach (var o in _parent._leftMap)
  246. {
  247. o.Value.OnError(error);
  248. }
  249. _parent.ForwardOnError(error);
  250. }
  251. }
  252. public override void OnCompleted()
  253. {
  254. Dispose();
  255. }
  256. }
  257. }
  258. }
  259. }