GroupJoin.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Subjects;
  9. namespace System.Reactive.Linq.ObservableImpl
  10. {
  11. class GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult>
  12. {
  13. private readonly IObservable<TLeft> _left;
  14. private readonly IObservable<TRight> _right;
  15. private readonly Func<TLeft, IObservable<TLeftDuration>> _leftDurationSelector;
  16. private readonly Func<TRight, IObservable<TRightDuration>> _rightDurationSelector;
  17. private readonly Func<TLeft, IObservable<TRight>, TResult> _resultSelector;
  18. public GroupJoin(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector)
  19. {
  20. _left = left;
  21. _right = right;
  22. _leftDurationSelector = leftDurationSelector;
  23. _rightDurationSelector = rightDurationSelector;
  24. _resultSelector = resultSelector;
  25. }
  26. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  27. {
  28. var sink = new _(this, observer, cancel);
  29. setSink(sink);
  30. return sink.Run();
  31. }
  32. class _ : Sink<TResult>
  33. {
  34. private readonly GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> _parent;
  35. public _(GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  36. : base(observer, cancel)
  37. {
  38. _parent = parent;
  39. }
  40. private object _gate;
  41. private CompositeDisposable _group;
  42. private RefCountDisposable _refCount;
  43. private int _leftID;
  44. private SortedDictionary<int, IObserver<TRight>> _leftMap;
  45. private int _rightID;
  46. private SortedDictionary<int, TRight> _rightMap;
  47. public IDisposable Run()
  48. {
  49. _gate = new object();
  50. _group = new CompositeDisposable();
  51. _refCount = new RefCountDisposable(_group);
  52. var leftSubscription = new SingleAssignmentDisposable();
  53. _group.Add(leftSubscription);
  54. _leftID = 0;
  55. _leftMap = new SortedDictionary<int, IObserver<TRight>>();
  56. var rightSubscription = new SingleAssignmentDisposable();
  57. _group.Add(rightSubscription);
  58. _rightID = 0;
  59. _rightMap = new SortedDictionary<int, TRight>();
  60. leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
  61. rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
  62. return _refCount;
  63. }
  64. class LeftObserver : IObserver<TLeft>
  65. {
  66. private readonly _ _parent;
  67. private readonly IDisposable _self;
  68. public LeftObserver(_ parent, IDisposable self)
  69. {
  70. _parent = parent;
  71. _self = self;
  72. }
  73. private void Expire(int id, IObserver<TRight> group, IDisposable resource)
  74. {
  75. lock (_parent._gate)
  76. {
  77. if (_parent._leftMap.Remove(id))
  78. {
  79. group.OnCompleted();
  80. }
  81. }
  82. _parent._group.Remove(resource);
  83. }
  84. public void OnNext(TLeft value)
  85. {
  86. var s = new Subject<TRight>();
  87. var id = 0;
  88. var rightID = 0;
  89. lock (_parent._gate)
  90. {
  91. id = _parent._leftID++;
  92. rightID = _parent._rightID;
  93. _parent._leftMap.Add(id, s);
  94. }
  95. var window = new WindowObservable<TRight>(s, _parent._refCount);
  96. // BREAKING CHANGE v2 > v1.x - Order of evaluation or the _leftDurationSelector and _resultSelector now consistent with Join.
  97. var md = new SingleAssignmentDisposable();
  98. _parent._group.Add(md);
  99. var duration = default(IObservable<TLeftDuration>);
  100. try
  101. {
  102. duration = _parent._parent._leftDurationSelector(value);
  103. }
  104. catch (Exception exception)
  105. {
  106. OnError(exception);
  107. return;
  108. }
  109. // BREAKING CHANGE v2 > v1.x - The duration sequence is subscribed to before the result sequence is evaluated.
  110. md.Disposable = duration.SubscribeSafe(new Delta(this, id, s, md));
  111. var result = default(TResult);
  112. try
  113. {
  114. result = _parent._parent._resultSelector(value, window);
  115. }
  116. catch (Exception exception)
  117. {
  118. OnError(exception);
  119. return;
  120. }
  121. lock (_parent._gate)
  122. {
  123. _parent._observer.OnNext(result);
  124. foreach (var rightValue in _parent._rightMap)
  125. {
  126. if (rightValue.Key < rightID)
  127. {
  128. s.OnNext(rightValue.Value);
  129. }
  130. }
  131. }
  132. }
  133. class Delta : IObserver<TLeftDuration>
  134. {
  135. private readonly LeftObserver _parent;
  136. private readonly int _id;
  137. private readonly IObserver<TRight> _group;
  138. private readonly IDisposable _self;
  139. public Delta(LeftObserver parent, int id, IObserver<TRight> group, IDisposable self)
  140. {
  141. _parent = parent;
  142. _id = id;
  143. _group = group;
  144. _self = self;
  145. }
  146. public void OnNext(TLeftDuration value)
  147. {
  148. _parent.Expire(_id, _group, _self);
  149. }
  150. public void OnError(Exception error)
  151. {
  152. _parent.OnError(error);
  153. }
  154. public void OnCompleted()
  155. {
  156. _parent.Expire(_id, _group, _self);
  157. }
  158. }
  159. public void OnError(Exception error)
  160. {
  161. lock (_parent._gate)
  162. {
  163. foreach (var o in _parent._leftMap)
  164. {
  165. o.Value.OnError(error);
  166. }
  167. _parent._observer.OnError(error);
  168. _parent.Dispose();
  169. }
  170. }
  171. public void OnCompleted()
  172. {
  173. lock (_parent._gate)
  174. {
  175. _parent._observer.OnCompleted();
  176. _parent.Dispose();
  177. }
  178. _self.Dispose();
  179. }
  180. }
  181. class RightObserver : IObserver<TRight>
  182. {
  183. private readonly _ _parent;
  184. private readonly IDisposable _self;
  185. public RightObserver(_ parent, IDisposable self)
  186. {
  187. _parent = parent;
  188. _self = self;
  189. }
  190. private void Expire(int id, IDisposable resource)
  191. {
  192. lock (_parent._gate)
  193. {
  194. _parent._rightMap.Remove(id);
  195. }
  196. _parent._group.Remove(resource);
  197. }
  198. public void OnNext(TRight value)
  199. {
  200. var id = 0;
  201. var leftID = 0;
  202. lock (_parent._gate)
  203. {
  204. id = _parent._rightID++;
  205. leftID = _parent._leftID;
  206. _parent._rightMap.Add(id, value);
  207. }
  208. var md = new SingleAssignmentDisposable();
  209. _parent._group.Add(md);
  210. var duration = default(IObservable<TRightDuration>);
  211. try
  212. {
  213. duration = _parent._parent._rightDurationSelector(value);
  214. }
  215. catch (Exception exception)
  216. {
  217. OnError(exception);
  218. return;
  219. }
  220. md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
  221. lock (_parent._gate)
  222. {
  223. foreach (var o in _parent._leftMap)
  224. {
  225. if (o.Key < leftID)
  226. {
  227. o.Value.OnNext(value);
  228. }
  229. }
  230. }
  231. }
  232. class Delta : IObserver<TRightDuration>
  233. {
  234. private readonly RightObserver _parent;
  235. private readonly int _id;
  236. private readonly IDisposable _self;
  237. public Delta(RightObserver parent, int id, IDisposable self)
  238. {
  239. _parent = parent;
  240. _id = id;
  241. _self = self;
  242. }
  243. public void OnNext(TRightDuration value)
  244. {
  245. _parent.Expire(_id, _self);
  246. }
  247. public void OnError(Exception error)
  248. {
  249. _parent.OnError(error);
  250. }
  251. public void OnCompleted()
  252. {
  253. _parent.Expire(_id, _self);
  254. }
  255. }
  256. public void OnError(Exception error)
  257. {
  258. lock (_parent._gate)
  259. {
  260. foreach (var o in _parent._leftMap)
  261. {
  262. o.Value.OnError(error);
  263. }
  264. _parent._observer.OnError(error);
  265. _parent.Dispose();
  266. }
  267. }
  268. public void OnCompleted()
  269. {
  270. _self.Dispose();
  271. }
  272. }
  273. }
  274. }
  275. }
  276. #endif