CombineLatest.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Collections.ObjectModel;
  8. using System.Linq;
  9. using System.Reactive.Disposables;
  10. namespace System.Reactive.Linq.ObservableImpl
  11. {
  12. #region Binary
  13. class CombineLatest<TFirst, TSecond, TResult> : Producer<TResult>
  14. {
  15. private readonly IObservable<TFirst> _first;
  16. private readonly IObservable<TSecond> _second;
  17. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  18. public CombineLatest(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  19. {
  20. _first = first;
  21. _second = second;
  22. _resultSelector = resultSelector;
  23. }
  24. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  25. {
  26. var sink = new _(this, observer, cancel);
  27. setSink(sink);
  28. return sink.Run();
  29. }
  30. class _ : Sink<TResult>
  31. {
  32. private readonly CombineLatest<TFirst, TSecond, TResult> _parent;
  33. public _(CombineLatest<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  34. : base(observer, cancel)
  35. {
  36. _parent = parent;
  37. }
  38. private object _gate;
  39. public IDisposable Run()
  40. {
  41. _gate = new object();
  42. var fstSubscription = new SingleAssignmentDisposable();
  43. var sndSubscription = new SingleAssignmentDisposable();
  44. var fstO = new F(this, fstSubscription);
  45. var sndO = new S(this, sndSubscription);
  46. fstO.Other = sndO;
  47. sndO.Other = fstO;
  48. fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
  49. sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
  50. return StableCompositeDisposable.Create(fstSubscription, sndSubscription);
  51. }
  52. class F : IObserver<TFirst>
  53. {
  54. private readonly _ _parent;
  55. private readonly IDisposable _self;
  56. private S _other;
  57. public F(_ parent, IDisposable self)
  58. {
  59. _parent = parent;
  60. _self = self;
  61. }
  62. public S Other { set { _other = value; } }
  63. public bool HasValue { get; private set; }
  64. public TFirst Value { get; private set; }
  65. public bool Done { get; private set; }
  66. public void OnNext(TFirst value)
  67. {
  68. lock (_parent._gate)
  69. {
  70. HasValue = true;
  71. Value = value;
  72. if (_other.HasValue)
  73. {
  74. var res = default(TResult);
  75. try
  76. {
  77. res = _parent._parent._resultSelector(value, _other.Value);
  78. }
  79. catch (Exception ex)
  80. {
  81. _parent._observer.OnError(ex);
  82. _parent.Dispose();
  83. return;
  84. }
  85. _parent._observer.OnNext(res);
  86. }
  87. else if (_other.Done)
  88. {
  89. _parent._observer.OnCompleted();
  90. _parent.Dispose();
  91. return;
  92. }
  93. }
  94. }
  95. public void OnError(Exception error)
  96. {
  97. lock (_parent._gate)
  98. {
  99. _parent._observer.OnError(error);
  100. _parent.Dispose();
  101. }
  102. }
  103. public void OnCompleted()
  104. {
  105. lock (_parent._gate)
  106. {
  107. Done = true;
  108. if (_other.Done)
  109. {
  110. _parent._observer.OnCompleted();
  111. _parent.Dispose();
  112. return;
  113. }
  114. else
  115. {
  116. _self.Dispose();
  117. }
  118. }
  119. }
  120. }
  121. class S : IObserver<TSecond>
  122. {
  123. private readonly _ _parent;
  124. private readonly IDisposable _self;
  125. private F _other;
  126. public S(_ parent, IDisposable self)
  127. {
  128. _parent = parent;
  129. _self = self;
  130. }
  131. public F Other { set { _other = value; } }
  132. public bool HasValue { get; private set; }
  133. public TSecond Value { get; private set; }
  134. public bool Done { get; private set; }
  135. public void OnNext(TSecond value)
  136. {
  137. lock (_parent._gate)
  138. {
  139. HasValue = true;
  140. Value = value;
  141. if (_other.HasValue)
  142. {
  143. var res = default(TResult);
  144. try
  145. {
  146. res = _parent._parent._resultSelector(_other.Value, value);
  147. }
  148. catch (Exception ex)
  149. {
  150. _parent._observer.OnError(ex);
  151. _parent.Dispose();
  152. return;
  153. }
  154. _parent._observer.OnNext(res);
  155. }
  156. else if (_other.Done)
  157. {
  158. _parent._observer.OnCompleted();
  159. _parent.Dispose();
  160. return;
  161. }
  162. }
  163. }
  164. public void OnError(Exception error)
  165. {
  166. lock (_parent._gate)
  167. {
  168. _parent._observer.OnError(error);
  169. _parent.Dispose();
  170. }
  171. }
  172. public void OnCompleted()
  173. {
  174. lock (_parent._gate)
  175. {
  176. Done = true;
  177. if (_other.Done)
  178. {
  179. _parent._observer.OnCompleted();
  180. _parent.Dispose();
  181. return;
  182. }
  183. else
  184. {
  185. _self.Dispose();
  186. }
  187. }
  188. }
  189. }
  190. }
  191. }
  192. #endregion
  193. #region [3,16]-ary
  194. #region Helpers for n-ary overloads
  195. interface ICombineLatest
  196. {
  197. void Next(int index);
  198. void Fail(Exception error);
  199. void Done(int index);
  200. }
  201. abstract class CombineLatestSink<TResult> : Sink<TResult>, ICombineLatest
  202. {
  203. protected readonly object _gate;
  204. private bool _hasValueAll;
  205. private readonly bool[] _hasValue;
  206. private readonly bool[] _isDone;
  207. public CombineLatestSink(int arity, IObserver<TResult> observer, IDisposable cancel)
  208. : base(observer, cancel)
  209. {
  210. _gate = new object();
  211. _hasValue = new bool[arity];
  212. _isDone = new bool[arity];
  213. }
  214. public void Next(int index)
  215. {
  216. if (!_hasValueAll)
  217. {
  218. _hasValue[index] = true;
  219. var hasValueAll = true;
  220. foreach (var hasValue in _hasValue)
  221. {
  222. if (!hasValue)
  223. {
  224. hasValueAll = false;
  225. break;
  226. }
  227. }
  228. _hasValueAll = hasValueAll;
  229. }
  230. if (_hasValueAll)
  231. {
  232. var res = default(TResult);
  233. try
  234. {
  235. res = GetResult();
  236. }
  237. catch (Exception ex)
  238. {
  239. base._observer.OnError(ex);
  240. base.Dispose();
  241. return;
  242. }
  243. base._observer.OnNext(res);
  244. }
  245. else
  246. {
  247. var allOthersDone = true;
  248. for (int i = 0; i < _isDone.Length; i++)
  249. {
  250. if (i != index && !_isDone[i])
  251. {
  252. allOthersDone = false;
  253. break;
  254. }
  255. }
  256. if (allOthersDone)
  257. {
  258. base._observer.OnCompleted();
  259. base.Dispose();
  260. }
  261. }
  262. }
  263. protected abstract TResult GetResult();
  264. public void Fail(Exception error)
  265. {
  266. base._observer.OnError(error);
  267. base.Dispose();
  268. }
  269. public void Done(int index)
  270. {
  271. _isDone[index] = true;
  272. var allDone = true;
  273. foreach (var isDone in _isDone)
  274. {
  275. if (!isDone)
  276. {
  277. allDone = false;
  278. break;
  279. }
  280. }
  281. if (allDone)
  282. {
  283. base._observer.OnCompleted();
  284. base.Dispose();
  285. return;
  286. }
  287. }
  288. }
  289. class CombineLatestObserver<T> : IObserver<T>
  290. {
  291. private readonly object _gate;
  292. private readonly ICombineLatest _parent;
  293. private readonly int _index;
  294. private readonly IDisposable _self;
  295. private T _value;
  296. public CombineLatestObserver(object gate, ICombineLatest parent, int index, IDisposable self)
  297. {
  298. _gate = gate;
  299. _parent = parent;
  300. _index = index;
  301. _self = self;
  302. }
  303. public T Value
  304. {
  305. get { return _value; }
  306. }
  307. public void OnNext(T value)
  308. {
  309. lock (_gate)
  310. {
  311. _value = value;
  312. _parent.Next(_index);
  313. }
  314. }
  315. public void OnError(Exception error)
  316. {
  317. _self.Dispose();
  318. lock (_gate)
  319. {
  320. _parent.Fail(error);
  321. }
  322. }
  323. public void OnCompleted()
  324. {
  325. _self.Dispose();
  326. lock (_gate)
  327. {
  328. _parent.Done(_index);
  329. }
  330. }
  331. }
  332. #endregion
  333. #endregion
  334. #region N-ary
  335. class CombineLatest<TSource, TResult> : Producer<TResult>
  336. {
  337. private readonly IEnumerable<IObservable<TSource>> _sources;
  338. private readonly Func<IList<TSource>, TResult> _resultSelector;
  339. public CombineLatest(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
  340. {
  341. _sources = sources;
  342. _resultSelector = resultSelector;
  343. }
  344. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  345. {
  346. var sink = new _(this, observer, cancel);
  347. setSink(sink);
  348. return sink.Run();
  349. }
  350. class _ : Sink<TResult>
  351. {
  352. private readonly CombineLatest<TSource, TResult> _parent;
  353. public _(CombineLatest<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  354. : base(observer, cancel)
  355. {
  356. _parent = parent;
  357. }
  358. private object _gate;
  359. private bool[] _hasValue;
  360. private bool _hasValueAll;
  361. private List<TSource> _values;
  362. private bool[] _isDone;
  363. private IDisposable[] _subscriptions;
  364. public IDisposable Run()
  365. {
  366. var srcs = _parent._sources.ToArray();
  367. var N = srcs.Length;
  368. _hasValue = new bool[N];
  369. _hasValueAll = false;
  370. _values = new List<TSource>(N);
  371. for (int i = 0; i < N; i++)
  372. _values.Add(default(TSource));
  373. _isDone = new bool[N];
  374. _subscriptions = new IDisposable[N];
  375. _gate = new object();
  376. for (int i = 0; i < N; i++)
  377. {
  378. var j = i;
  379. var d = new SingleAssignmentDisposable();
  380. _subscriptions[j] = d;
  381. var o = new O(this, j);
  382. d.Disposable = srcs[j].SubscribeSafe(o);
  383. }
  384. return StableCompositeDisposable.Create(_subscriptions);
  385. }
  386. private void OnNext(int index, TSource value)
  387. {
  388. lock (_gate)
  389. {
  390. _values[index] = value;
  391. _hasValue[index] = true;
  392. if (_hasValueAll || (_hasValueAll = _hasValue.All(Stubs<bool>.I)))
  393. {
  394. var res = default(TResult);
  395. try
  396. {
  397. res = _parent._resultSelector(new ReadOnlyCollection<TSource>(_values));
  398. }
  399. catch (Exception ex)
  400. {
  401. base._observer.OnError(ex);
  402. base.Dispose();
  403. return;
  404. }
  405. _observer.OnNext(res);
  406. }
  407. else if (_isDone.Where((x, i) => i != index).All(Stubs<bool>.I))
  408. {
  409. base._observer.OnCompleted();
  410. base.Dispose();
  411. return;
  412. }
  413. }
  414. }
  415. private void OnError(Exception error)
  416. {
  417. lock (_gate)
  418. {
  419. base._observer.OnError(error);
  420. base.Dispose();
  421. }
  422. }
  423. private void OnCompleted(int index)
  424. {
  425. lock (_gate)
  426. {
  427. _isDone[index] = true;
  428. if (_isDone.All(Stubs<bool>.I))
  429. {
  430. base._observer.OnCompleted();
  431. base.Dispose();
  432. return;
  433. }
  434. else
  435. {
  436. _subscriptions[index].Dispose();
  437. }
  438. }
  439. }
  440. class O : IObserver<TSource>
  441. {
  442. private readonly _ _parent;
  443. private readonly int _index;
  444. public O(_ parent, int index)
  445. {
  446. _parent = parent;
  447. _index = index;
  448. }
  449. public void OnNext(TSource value)
  450. {
  451. _parent.OnNext(_index, value);
  452. }
  453. public void OnError(Exception error)
  454. {
  455. _parent.OnError(error);
  456. }
  457. public void OnCompleted()
  458. {
  459. _parent.OnCompleted(_index);
  460. }
  461. }
  462. }
  463. }
  464. #endregion
  465. }
  466. #endif