Join.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. class Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult>
  9. {
  10. private readonly IObservable<TLeft> _left;
  11. private readonly IObservable<TRight> _right;
  12. private readonly Func<TLeft, IObservable<TLeftDuration>> _leftDurationSelector;
  13. private readonly Func<TRight, IObservable<TRightDuration>> _rightDurationSelector;
  14. private readonly Func<TLeft, TRight, TResult> _resultSelector;
  15. public Join(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector)
  16. {
  17. _left = left;
  18. _right = right;
  19. _leftDurationSelector = leftDurationSelector;
  20. _rightDurationSelector = rightDurationSelector;
  21. _resultSelector = resultSelector;
  22. }
  23. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  24. {
  25. var sink = new _(this, observer, cancel);
  26. setSink(sink);
  27. return sink.Run();
  28. }
  29. class _ : Sink<TResult>
  30. {
  31. private readonly Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> _parent;
  32. public _(Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  33. : base(observer, cancel)
  34. {
  35. _parent = parent;
  36. }
  37. private object _gate;
  38. private CompositeDisposable _group;
  39. private bool _leftDone;
  40. private int _leftID;
  41. private Dictionary<int, TLeft> _leftMap;
  42. private bool _rightDone;
  43. private int _rightID;
  44. private Dictionary<int, TRight> _rightMap;
  45. public IDisposable Run()
  46. {
  47. _gate = new object();
  48. _group = new CompositeDisposable();
  49. var leftSubscription = new SingleAssignmentDisposable();
  50. _group.Add(leftSubscription);
  51. _leftDone = false;
  52. _leftID = 0;
  53. _leftMap = new Dictionary<int, TLeft>();
  54. var rightSubscription = new SingleAssignmentDisposable();
  55. _group.Add(rightSubscription);
  56. _rightDone = false;
  57. _rightID = 0;
  58. _rightMap = new Dictionary<int, TRight>();
  59. leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
  60. rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
  61. return _group;
  62. }
  63. class LeftObserver : IObserver<TLeft>
  64. {
  65. private readonly _ _parent;
  66. private readonly IDisposable _self;
  67. public LeftObserver(_ parent, IDisposable self)
  68. {
  69. _parent = parent;
  70. _self = self;
  71. }
  72. private void Expire(int id, IDisposable resource)
  73. {
  74. lock (_parent._gate)
  75. {
  76. if (_parent._leftMap.Remove(id) && _parent._leftMap.Count == 0 && _parent._leftDone)
  77. {
  78. _parent._observer.OnCompleted();
  79. _parent.Dispose();
  80. }
  81. }
  82. _parent._group.Remove(resource);
  83. }
  84. public void OnNext(TLeft value)
  85. {
  86. var id = 0;
  87. lock (_parent._gate)
  88. {
  89. id = _parent._leftID++;
  90. _parent._leftMap.Add(id, value);
  91. }
  92. var md = new SingleAssignmentDisposable();
  93. _parent._group.Add(md);
  94. var duration = default(IObservable<TLeftDuration>);
  95. try
  96. {
  97. duration = _parent._parent._leftDurationSelector(value);
  98. }
  99. catch (Exception exception)
  100. {
  101. _parent._observer.OnError(exception);
  102. _parent.Dispose();
  103. return;
  104. }
  105. md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
  106. lock (_parent._gate)
  107. {
  108. foreach (var rightValue in _parent._rightMap.Values)
  109. {
  110. var result = default(TResult);
  111. try
  112. {
  113. result = _parent._parent._resultSelector(value, rightValue);
  114. }
  115. catch (Exception exception)
  116. {
  117. _parent._observer.OnError(exception);
  118. _parent.Dispose();
  119. return;
  120. }
  121. _parent._observer.OnNext(result);
  122. }
  123. }
  124. }
  125. class Delta : IObserver<TLeftDuration>
  126. {
  127. private readonly LeftObserver _parent;
  128. private readonly int _id;
  129. private readonly IDisposable _self;
  130. public Delta(LeftObserver parent, int id, IDisposable self)
  131. {
  132. _parent = parent;
  133. _id = id;
  134. _self = self;
  135. }
  136. public void OnNext(TLeftDuration value)
  137. {
  138. _parent.Expire(_id, _self);
  139. }
  140. public void OnError(Exception error)
  141. {
  142. _parent.OnError(error);
  143. }
  144. public void OnCompleted()
  145. {
  146. _parent.Expire(_id, _self);
  147. }
  148. }
  149. public void OnError(Exception error)
  150. {
  151. lock (_parent._gate)
  152. {
  153. _parent._observer.OnError(error);
  154. _parent.Dispose();
  155. }
  156. }
  157. public void OnCompleted()
  158. {
  159. lock (_parent._gate)
  160. {
  161. _parent._leftDone = true;
  162. if (_parent._rightDone || _parent._leftMap.Count == 0)
  163. {
  164. _parent._observer.OnCompleted();
  165. _parent.Dispose();
  166. }
  167. else
  168. {
  169. _self.Dispose();
  170. }
  171. }
  172. }
  173. }
  174. class RightObserver : IObserver<TRight>
  175. {
  176. private readonly _ _parent;
  177. private readonly IDisposable _self;
  178. public RightObserver(_ parent, IDisposable self)
  179. {
  180. _parent = parent;
  181. _self = self;
  182. }
  183. private void Expire(int id, IDisposable resource)
  184. {
  185. lock (_parent._gate)
  186. {
  187. if (_parent._rightMap.Remove(id) && _parent._rightMap.Count == 0 && _parent._rightDone)
  188. {
  189. _parent._observer.OnCompleted();
  190. _parent.Dispose();
  191. }
  192. }
  193. _parent._group.Remove(resource);
  194. }
  195. public void OnNext(TRight value)
  196. {
  197. var id = 0;
  198. lock (_parent._gate)
  199. {
  200. id = _parent._rightID++;
  201. _parent._rightMap.Add(id, value);
  202. }
  203. var md = new SingleAssignmentDisposable();
  204. _parent._group.Add(md);
  205. var duration = default(IObservable<TRightDuration>);
  206. try
  207. {
  208. duration = _parent._parent._rightDurationSelector(value);
  209. }
  210. catch (Exception exception)
  211. {
  212. _parent._observer.OnError(exception);
  213. _parent.Dispose();
  214. return;
  215. }
  216. md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
  217. lock (_parent._gate)
  218. {
  219. foreach (var leftValue in _parent._leftMap.Values)
  220. {
  221. var result = default(TResult);
  222. try
  223. {
  224. result = _parent._parent._resultSelector(leftValue, value);
  225. }
  226. catch (Exception exception)
  227. {
  228. _parent._observer.OnError(exception);
  229. _parent.Dispose();
  230. return;
  231. }
  232. _parent._observer.OnNext(result);
  233. }
  234. }
  235. }
  236. class Delta : IObserver<TRightDuration>
  237. {
  238. private readonly RightObserver _parent;
  239. private readonly int _id;
  240. private readonly IDisposable _self;
  241. public Delta(RightObserver parent, int id, IDisposable self)
  242. {
  243. _parent = parent;
  244. _id = id;
  245. _self = self;
  246. }
  247. public void OnNext(TRightDuration value)
  248. {
  249. _parent.Expire(_id, _self);
  250. }
  251. public void OnError(Exception error)
  252. {
  253. _parent.OnError(error);
  254. }
  255. public void OnCompleted()
  256. {
  257. _parent.Expire(_id, _self);
  258. }
  259. }
  260. public void OnError(Exception error)
  261. {
  262. lock (_parent._gate)
  263. {
  264. _parent._observer.OnError(error);
  265. _parent.Dispose();
  266. }
  267. }
  268. public void OnCompleted()
  269. {
  270. lock (_parent._gate)
  271. {
  272. _parent._rightDone = true;
  273. if (_parent._leftDone || _parent._rightMap.Count == 0)
  274. {
  275. _parent._observer.OnCompleted();
  276. _parent.Dispose();
  277. }
  278. else
  279. {
  280. _self.Dispose();
  281. }
  282. }
  283. }
  284. }
  285. }
  286. }
  287. }
  288. #endif