1
0

CombineLatest.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Collections.ObjectModel;
  6. using System.Linq;
  7. using System.Reactive.Disposables;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. #region Binary
  11. class CombineLatest<TFirst, TSecond, TResult> : Producer<TResult>
  12. {
  13. private readonly IObservable<TFirst> _first;
  14. private readonly IObservable<TSecond> _second;
  15. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  16. public CombineLatest(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  17. {
  18. _first = first;
  19. _second = second;
  20. _resultSelector = resultSelector;
  21. }
  22. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  23. {
  24. var sink = new _(this, observer, cancel);
  25. setSink(sink);
  26. return sink.Run();
  27. }
  28. class _ : Sink<TResult>
  29. {
  30. private readonly CombineLatest<TFirst, TSecond, TResult> _parent;
  31. public _(CombineLatest<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  32. : base(observer, cancel)
  33. {
  34. _parent = parent;
  35. }
  36. private object _gate;
  37. public IDisposable Run()
  38. {
  39. _gate = new object();
  40. var fstSubscription = new SingleAssignmentDisposable();
  41. var sndSubscription = new SingleAssignmentDisposable();
  42. var fstO = new F(this, fstSubscription);
  43. var sndO = new S(this, sndSubscription);
  44. fstO.Other = sndO;
  45. sndO.Other = fstO;
  46. fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
  47. sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
  48. return StableCompositeDisposable.Create(fstSubscription, sndSubscription);
  49. }
  50. class F : IObserver<TFirst>
  51. {
  52. private readonly _ _parent;
  53. private readonly IDisposable _self;
  54. private S _other;
  55. public F(_ parent, IDisposable self)
  56. {
  57. _parent = parent;
  58. _self = self;
  59. }
  60. public S Other { set { _other = value; } }
  61. public bool HasValue { get; private set; }
  62. public TFirst Value { get; private set; }
  63. public bool Done { get; private set; }
  64. public void OnNext(TFirst value)
  65. {
  66. lock (_parent._gate)
  67. {
  68. HasValue = true;
  69. Value = value;
  70. if (_other.HasValue)
  71. {
  72. var res = default(TResult);
  73. try
  74. {
  75. res = _parent._parent._resultSelector(value, _other.Value);
  76. }
  77. catch (Exception ex)
  78. {
  79. _parent._observer.OnError(ex);
  80. _parent.Dispose();
  81. return;
  82. }
  83. _parent._observer.OnNext(res);
  84. }
  85. else if (_other.Done)
  86. {
  87. _parent._observer.OnCompleted();
  88. _parent.Dispose();
  89. return;
  90. }
  91. }
  92. }
  93. public void OnError(Exception error)
  94. {
  95. lock (_parent._gate)
  96. {
  97. _parent._observer.OnError(error);
  98. _parent.Dispose();
  99. }
  100. }
  101. public void OnCompleted()
  102. {
  103. lock (_parent._gate)
  104. {
  105. Done = true;
  106. if (_other.Done)
  107. {
  108. _parent._observer.OnCompleted();
  109. _parent.Dispose();
  110. return;
  111. }
  112. else
  113. {
  114. _self.Dispose();
  115. }
  116. }
  117. }
  118. }
  119. class S : IObserver<TSecond>
  120. {
  121. private readonly _ _parent;
  122. private readonly IDisposable _self;
  123. private F _other;
  124. public S(_ parent, IDisposable self)
  125. {
  126. _parent = parent;
  127. _self = self;
  128. }
  129. public F Other { set { _other = value; } }
  130. public bool HasValue { get; private set; }
  131. public TSecond Value { get; private set; }
  132. public bool Done { get; private set; }
  133. public void OnNext(TSecond value)
  134. {
  135. lock (_parent._gate)
  136. {
  137. HasValue = true;
  138. Value = value;
  139. if (_other.HasValue)
  140. {
  141. var res = default(TResult);
  142. try
  143. {
  144. res = _parent._parent._resultSelector(_other.Value, value);
  145. }
  146. catch (Exception ex)
  147. {
  148. _parent._observer.OnError(ex);
  149. _parent.Dispose();
  150. return;
  151. }
  152. _parent._observer.OnNext(res);
  153. }
  154. else if (_other.Done)
  155. {
  156. _parent._observer.OnCompleted();
  157. _parent.Dispose();
  158. return;
  159. }
  160. }
  161. }
  162. public void OnError(Exception error)
  163. {
  164. lock (_parent._gate)
  165. {
  166. _parent._observer.OnError(error);
  167. _parent.Dispose();
  168. }
  169. }
  170. public void OnCompleted()
  171. {
  172. lock (_parent._gate)
  173. {
  174. Done = true;
  175. if (_other.Done)
  176. {
  177. _parent._observer.OnCompleted();
  178. _parent.Dispose();
  179. return;
  180. }
  181. else
  182. {
  183. _self.Dispose();
  184. }
  185. }
  186. }
  187. }
  188. }
  189. }
  190. #endregion
  191. #region [3,16]-ary
  192. #region Helpers for n-ary overloads
  193. interface ICombineLatest
  194. {
  195. void Next(int index);
  196. void Fail(Exception error);
  197. void Done(int index);
  198. }
  199. abstract class CombineLatestSink<TResult> : Sink<TResult>, ICombineLatest
  200. {
  201. protected readonly object _gate;
  202. private bool _hasValueAll;
  203. private readonly bool[] _hasValue;
  204. private readonly bool[] _isDone;
  205. public CombineLatestSink(int arity, IObserver<TResult> observer, IDisposable cancel)
  206. : base(observer, cancel)
  207. {
  208. _gate = new object();
  209. _hasValue = new bool[arity];
  210. _isDone = new bool[arity];
  211. }
  212. public void Next(int index)
  213. {
  214. if (!_hasValueAll)
  215. {
  216. _hasValue[index] = true;
  217. var hasValueAll = true;
  218. foreach (var hasValue in _hasValue)
  219. {
  220. if (!hasValue)
  221. {
  222. hasValueAll = false;
  223. break;
  224. }
  225. }
  226. _hasValueAll = hasValueAll;
  227. }
  228. if (_hasValueAll)
  229. {
  230. var res = default(TResult);
  231. try
  232. {
  233. res = GetResult();
  234. }
  235. catch (Exception ex)
  236. {
  237. base._observer.OnError(ex);
  238. base.Dispose();
  239. return;
  240. }
  241. base._observer.OnNext(res);
  242. }
  243. else
  244. {
  245. var allOthersDone = true;
  246. for (int i = 0; i < _isDone.Length; i++)
  247. {
  248. if (i != index && !_isDone[i])
  249. {
  250. allOthersDone = false;
  251. break;
  252. }
  253. }
  254. if (allOthersDone)
  255. {
  256. base._observer.OnCompleted();
  257. base.Dispose();
  258. }
  259. }
  260. }
  261. protected abstract TResult GetResult();
  262. public void Fail(Exception error)
  263. {
  264. base._observer.OnError(error);
  265. base.Dispose();
  266. }
  267. public void Done(int index)
  268. {
  269. _isDone[index] = true;
  270. var allDone = true;
  271. foreach (var isDone in _isDone)
  272. {
  273. if (!isDone)
  274. {
  275. allDone = false;
  276. break;
  277. }
  278. }
  279. if (allDone)
  280. {
  281. base._observer.OnCompleted();
  282. base.Dispose();
  283. return;
  284. }
  285. }
  286. }
  287. class CombineLatestObserver<T> : IObserver<T>
  288. {
  289. private readonly object _gate;
  290. private readonly ICombineLatest _parent;
  291. private readonly int _index;
  292. private readonly IDisposable _self;
  293. private T _value;
  294. public CombineLatestObserver(object gate, ICombineLatest parent, int index, IDisposable self)
  295. {
  296. _gate = gate;
  297. _parent = parent;
  298. _index = index;
  299. _self = self;
  300. }
  301. public T Value
  302. {
  303. get { return _value; }
  304. }
  305. public void OnNext(T value)
  306. {
  307. lock (_gate)
  308. {
  309. _value = value;
  310. _parent.Next(_index);
  311. }
  312. }
  313. public void OnError(Exception error)
  314. {
  315. _self.Dispose();
  316. lock (_gate)
  317. {
  318. _parent.Fail(error);
  319. }
  320. }
  321. public void OnCompleted()
  322. {
  323. _self.Dispose();
  324. lock (_gate)
  325. {
  326. _parent.Done(_index);
  327. }
  328. }
  329. }
  330. #endregion
  331. #endregion
  332. #region N-ary
  333. class CombineLatest<TSource, TResult> : Producer<TResult>
  334. {
  335. private readonly IEnumerable<IObservable<TSource>> _sources;
  336. private readonly Func<IList<TSource>, TResult> _resultSelector;
  337. public CombineLatest(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  338. {
  339. _sources = sources;
  340. _resultSelector = resultSelector;
  341. }
  342. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  343. {
  344. var sink = new _(this, observer, cancel);
  345. setSink(sink);
  346. return sink.Run();
  347. }
  348. class _ : Sink<TResult>
  349. {
  350. private readonly CombineLatest<TSource, TResult> _parent;
  351. public _(CombineLatest<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  352. : base(observer, cancel)
  353. {
  354. _parent = parent;
  355. }
  356. private object _gate;
  357. private bool[] _hasValue;
  358. private bool _hasValueAll;
  359. private List<TSource> _values;
  360. private bool[] _isDone;
  361. private IDisposable[] _subscriptions;
  362. public IDisposable Run()
  363. {
  364. var srcs = _parent._sources.ToArray();
  365. var N = srcs.Length;
  366. _hasValue = new bool[N];
  367. _hasValueAll = false;
  368. _values = new List<TSource>(N);
  369. for (int i = 0; i < N; i++)
  370. _values.Add(default(TSource));
  371. _isDone = new bool[N];
  372. _subscriptions = new IDisposable[N];
  373. _gate = new object();
  374. for (int i = 0; i < N; i++)
  375. {
  376. var j = i;
  377. var d = new SingleAssignmentDisposable();
  378. _subscriptions[j] = d;
  379. var o = new O(this, j);
  380. d.Disposable = srcs[j].SubscribeSafe(o);
  381. }
  382. return StableCompositeDisposable.Create(_subscriptions);
  383. }
  384. private void OnNext(int index, TSource value)
  385. {
  386. lock (_gate)
  387. {
  388. _values[index] = value;
  389. _hasValue[index] = true;
  390. if (_hasValueAll || (_hasValueAll = _hasValue.All(Stubs<bool>.I)))
  391. {
  392. var res = default(TResult);
  393. try
  394. {
  395. res = _parent._resultSelector(new ReadOnlyCollection<TSource>(_values));
  396. }
  397. catch (Exception ex)
  398. {
  399. base._observer.OnError(ex);
  400. base.Dispose();
  401. return;
  402. }
  403. _observer.OnNext(res);
  404. }
  405. else if (_isDone.Where((x, i) => i != index).All(Stubs<bool>.I))
  406. {
  407. base._observer.OnCompleted();
  408. base.Dispose();
  409. return;
  410. }
  411. }
  412. }
  413. private void OnError(Exception error)
  414. {
  415. lock (_gate)
  416. {
  417. base._observer.OnError(error);
  418. base.Dispose();
  419. }
  420. }
  421. private void OnCompleted(int index)
  422. {
  423. lock (_gate)
  424. {
  425. _isDone[index] = true;
  426. if (_isDone.All(Stubs<bool>.I))
  427. {
  428. base._observer.OnCompleted();
  429. base.Dispose();
  430. return;
  431. }
  432. else
  433. {
  434. _subscriptions[index].Dispose();
  435. }
  436. }
  437. }
  438. class O : IObserver<TSource>
  439. {
  440. private readonly _ _parent;
  441. private readonly int _index;
  442. public O(_ parent, int index)
  443. {
  444. _parent = parent;
  445. _index = index;
  446. }
  447. public void OnNext(TSource value)
  448. {
  449. _parent.OnNext(_index, value);
  450. }
  451. public void OnError(Exception error)
  452. {
  453. _parent.OnError(error);
  454. }
  455. public void OnCompleted()
  456. {
  457. _parent.OnCompleted(_index);
  458. }
  459. }
  460. }
  461. }
  462. #endregion
  463. }
  464. #endif