Join.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. class Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult>
  9. {
  10. private readonly IObservable<TLeft> _left;
  11. private readonly IObservable<TRight> _right;
  12. private readonly Func<TLeft, IObservable<TLeftDuration>> _leftDurationSelector;
  13. private readonly Func<TRight, IObservable<TRightDuration>> _rightDurationSelector;
  14. private readonly Func<TLeft, TRight, TResult> _resultSelector;
  15. public Join(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector)
  16. {
  17. _left = left;
  18. _right = right;
  19. _leftDurationSelector = leftDurationSelector;
  20. _rightDurationSelector = rightDurationSelector;
  21. _resultSelector = resultSelector;
  22. }
  23. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  24. {
  25. var sink = new _(this, observer, cancel);
  26. setSink(sink);
  27. return sink.Run();
  28. }
  29. class _ : Sink<TResult>
  30. {
  31. private readonly Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> _parent;
  32. public _(Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  33. : base(observer, cancel)
  34. {
  35. _parent = parent;
  36. }
  37. private object _gate;
  38. private CompositeDisposable _group;
  39. private bool _leftDone;
  40. private int _leftID;
  41. private SortedDictionary<int, TLeft> _leftMap;
  42. private bool _rightDone;
  43. private int _rightID;
  44. private SortedDictionary<int, TRight> _rightMap;
  45. public IDisposable Run()
  46. {
  47. _gate = new object();
  48. _group = new CompositeDisposable();
  49. var leftSubscription = new SingleAssignmentDisposable();
  50. _group.Add(leftSubscription);
  51. _leftDone = false;
  52. _leftID = 0;
  53. _leftMap = new SortedDictionary<int, TLeft>();
  54. var rightSubscription = new SingleAssignmentDisposable();
  55. _group.Add(rightSubscription);
  56. _rightDone = false;
  57. _rightID = 0;
  58. _rightMap = new SortedDictionary<int, TRight>();
  59. leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
  60. rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
  61. return _group;
  62. }
  63. class LeftObserver : IObserver<TLeft>
  64. {
  65. private readonly _ _parent;
  66. private readonly IDisposable _self;
  67. public LeftObserver(_ parent, IDisposable self)
  68. {
  69. _parent = parent;
  70. _self = self;
  71. }
  72. private void Expire(int id, IDisposable resource)
  73. {
  74. lock (_parent._gate)
  75. {
  76. if (_parent._leftMap.Remove(id) && _parent._leftMap.Count == 0 && _parent._leftDone)
  77. {
  78. _parent._observer.OnCompleted();
  79. _parent.Dispose();
  80. }
  81. }
  82. _parent._group.Remove(resource);
  83. }
  84. public void OnNext(TLeft value)
  85. {
  86. var id = 0;
  87. var rightID = 0;
  88. lock (_parent._gate)
  89. {
  90. id = _parent._leftID++;
  91. rightID = _parent._rightID;
  92. _parent._leftMap.Add(id, value);
  93. }
  94. var md = new SingleAssignmentDisposable();
  95. _parent._group.Add(md);
  96. var duration = default(IObservable<TLeftDuration>);
  97. try
  98. {
  99. duration = _parent._parent._leftDurationSelector(value);
  100. }
  101. catch (Exception exception)
  102. {
  103. _parent._observer.OnError(exception);
  104. _parent.Dispose();
  105. return;
  106. }
  107. md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
  108. lock (_parent._gate)
  109. {
  110. foreach (var rightValue in _parent._rightMap)
  111. {
  112. if (rightValue.Key < rightID)
  113. {
  114. var result = default(TResult);
  115. try
  116. {
  117. result = _parent._parent._resultSelector(value, rightValue.Value);
  118. }
  119. catch (Exception exception)
  120. {
  121. _parent._observer.OnError(exception);
  122. _parent.Dispose();
  123. return;
  124. }
  125. _parent._observer.OnNext(result);
  126. }
  127. }
  128. }
  129. }
  130. class Delta : IObserver<TLeftDuration>
  131. {
  132. private readonly LeftObserver _parent;
  133. private readonly int _id;
  134. private readonly IDisposable _self;
  135. public Delta(LeftObserver parent, int id, IDisposable self)
  136. {
  137. _parent = parent;
  138. _id = id;
  139. _self = self;
  140. }
  141. public void OnNext(TLeftDuration value)
  142. {
  143. _parent.Expire(_id, _self);
  144. }
  145. public void OnError(Exception error)
  146. {
  147. _parent.OnError(error);
  148. }
  149. public void OnCompleted()
  150. {
  151. _parent.Expire(_id, _self);
  152. }
  153. }
  154. public void OnError(Exception error)
  155. {
  156. lock (_parent._gate)
  157. {
  158. _parent._observer.OnError(error);
  159. _parent.Dispose();
  160. }
  161. }
  162. public void OnCompleted()
  163. {
  164. lock (_parent._gate)
  165. {
  166. _parent._leftDone = true;
  167. if (_parent._rightDone || _parent._leftMap.Count == 0)
  168. {
  169. _parent._observer.OnCompleted();
  170. _parent.Dispose();
  171. }
  172. else
  173. {
  174. _self.Dispose();
  175. }
  176. }
  177. }
  178. }
  179. class RightObserver : IObserver<TRight>
  180. {
  181. private readonly _ _parent;
  182. private readonly IDisposable _self;
  183. public RightObserver(_ parent, IDisposable self)
  184. {
  185. _parent = parent;
  186. _self = self;
  187. }
  188. private void Expire(int id, IDisposable resource)
  189. {
  190. lock (_parent._gate)
  191. {
  192. if (_parent._rightMap.Remove(id) && _parent._rightMap.Count == 0 && _parent._rightDone)
  193. {
  194. _parent._observer.OnCompleted();
  195. _parent.Dispose();
  196. }
  197. }
  198. _parent._group.Remove(resource);
  199. }
  200. public void OnNext(TRight value)
  201. {
  202. var id = 0;
  203. var leftID = 0;
  204. lock (_parent._gate)
  205. {
  206. id = _parent._rightID++;
  207. leftID = _parent._leftID;
  208. _parent._rightMap.Add(id, value);
  209. }
  210. var md = new SingleAssignmentDisposable();
  211. _parent._group.Add(md);
  212. var duration = default(IObservable<TRightDuration>);
  213. try
  214. {
  215. duration = _parent._parent._rightDurationSelector(value);
  216. }
  217. catch (Exception exception)
  218. {
  219. _parent._observer.OnError(exception);
  220. _parent.Dispose();
  221. return;
  222. }
  223. md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
  224. lock (_parent._gate)
  225. {
  226. foreach (var leftValue in _parent._leftMap)
  227. {
  228. if (leftValue.Key < leftID)
  229. {
  230. var result = default(TResult);
  231. try
  232. {
  233. result = _parent._parent._resultSelector(leftValue.Value, value);
  234. }
  235. catch (Exception exception)
  236. {
  237. _parent._observer.OnError(exception);
  238. _parent.Dispose();
  239. return;
  240. }
  241. _parent._observer.OnNext(result);
  242. }
  243. }
  244. }
  245. }
  246. class Delta : IObserver<TRightDuration>
  247. {
  248. private readonly RightObserver _parent;
  249. private readonly int _id;
  250. private readonly IDisposable _self;
  251. public Delta(RightObserver parent, int id, IDisposable self)
  252. {
  253. _parent = parent;
  254. _id = id;
  255. _self = self;
  256. }
  257. public void OnNext(TRightDuration value)
  258. {
  259. _parent.Expire(_id, _self);
  260. }
  261. public void OnError(Exception error)
  262. {
  263. _parent.OnError(error);
  264. }
  265. public void OnCompleted()
  266. {
  267. _parent.Expire(_id, _self);
  268. }
  269. }
  270. public void OnError(Exception error)
  271. {
  272. lock (_parent._gate)
  273. {
  274. _parent._observer.OnError(error);
  275. _parent.Dispose();
  276. }
  277. }
  278. public void OnCompleted()
  279. {
  280. lock (_parent._gate)
  281. {
  282. _parent._rightDone = true;
  283. if (_parent._leftDone || _parent._rightMap.Count == 0)
  284. {
  285. _parent._observer.OnCompleted();
  286. _parent.Dispose();
  287. }
  288. else
  289. {
  290. _self.Dispose();
  291. }
  292. }
  293. }
  294. }
  295. }
  296. }
  297. }
  298. #endif