Zip.cs 19 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Collections;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Reactive.Disposables;
  10. namespace System.Reactive.Linq.ObservableImpl
  11. {
  12. #region Binary
  13. class Zip<TFirst, TSecond, TResult> : Producer<TResult>
  14. {
  15. private readonly IObservable<TFirst> _first;
  16. private readonly IObservable<TSecond> _second;
  17. private readonly IEnumerable<TSecond> _secondE;
  18. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  19. public Zip(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  20. {
  21. _first = first;
  22. _second = second;
  23. _resultSelector = resultSelector;
  24. }
  25. public Zip(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  26. {
  27. _first = first;
  28. _secondE = second;
  29. _resultSelector = resultSelector;
  30. }
  31. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  32. {
  33. if (_second != null)
  34. {
  35. var sink = new _(this, observer, cancel);
  36. setSink(sink);
  37. return sink.Run();
  38. }
  39. else
  40. {
  41. var sink = new ZipImpl(this, observer, cancel);
  42. setSink(sink);
  43. return sink.Run();
  44. }
  45. }
  46. class _ : Sink<TResult>
  47. {
  48. private readonly Zip<TFirst, TSecond, TResult> _parent;
  49. public _(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  50. : base(observer, cancel)
  51. {
  52. _parent = parent;
  53. }
  54. private object _gate;
  55. public IDisposable Run()
  56. {
  57. _gate = new object();
  58. var fstSubscription = new SingleAssignmentDisposable();
  59. var sndSubscription = new SingleAssignmentDisposable();
  60. var fstO = new F(this, fstSubscription);
  61. var sndO = new S(this, sndSubscription);
  62. fstO.Other = sndO;
  63. sndO.Other = fstO;
  64. fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
  65. sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
  66. return StableCompositeDisposable.Create(fstSubscription, sndSubscription, fstO, sndO);
  67. }
  68. class F : IObserver<TFirst>, IDisposable
  69. {
  70. private readonly _ _parent;
  71. private readonly IDisposable _self;
  72. private S _other;
  73. private Queue<TFirst> _queue;
  74. public F(_ parent, IDisposable self)
  75. {
  76. _parent = parent;
  77. _self = self;
  78. _queue = new Queue<TFirst>();
  79. }
  80. public S Other { set { _other = value; } }
  81. public Queue<TFirst> Queue { get { return _queue; } }
  82. public bool Done { get; private set; }
  83. public void OnNext(TFirst value)
  84. {
  85. lock (_parent._gate)
  86. {
  87. if (_other.Queue.Count > 0)
  88. {
  89. var r = _other.Queue.Dequeue();
  90. var res = default(TResult);
  91. try
  92. {
  93. res = _parent._parent._resultSelector(value, r);
  94. }
  95. catch (Exception ex)
  96. {
  97. _parent._observer.OnError(ex);
  98. _parent.Dispose();
  99. return;
  100. }
  101. _parent._observer.OnNext(res);
  102. }
  103. else
  104. {
  105. if (_other.Done)
  106. {
  107. _parent._observer.OnCompleted();
  108. _parent.Dispose();
  109. return;
  110. }
  111. _queue.Enqueue(value);
  112. }
  113. }
  114. }
  115. public void OnError(Exception error)
  116. {
  117. lock (_parent._gate)
  118. {
  119. _parent._observer.OnError(error);
  120. _parent.Dispose();
  121. }
  122. }
  123. public void OnCompleted()
  124. {
  125. lock (_parent._gate)
  126. {
  127. Done = true;
  128. if (_other.Done)
  129. {
  130. _parent._observer.OnCompleted();
  131. _parent.Dispose();
  132. return;
  133. }
  134. else
  135. {
  136. _self.Dispose();
  137. }
  138. }
  139. }
  140. public void Dispose()
  141. {
  142. _queue.Clear();
  143. }
  144. }
  145. class S : IObserver<TSecond>, IDisposable
  146. {
  147. private readonly _ _parent;
  148. private readonly IDisposable _self;
  149. private F _other;
  150. private Queue<TSecond> _queue;
  151. public S(_ parent, IDisposable self)
  152. {
  153. _parent = parent;
  154. _self = self;
  155. _queue = new Queue<TSecond>();
  156. }
  157. public F Other { set { _other = value; } }
  158. public Queue<TSecond> Queue { get { return _queue; } }
  159. public bool Done { get; private set; }
  160. public void OnNext(TSecond value)
  161. {
  162. lock (_parent._gate)
  163. {
  164. if (_other.Queue.Count > 0)
  165. {
  166. var l = _other.Queue.Dequeue();
  167. var res = default(TResult);
  168. try
  169. {
  170. res = _parent._parent._resultSelector(l, value);
  171. }
  172. catch (Exception ex)
  173. {
  174. _parent._observer.OnError(ex);
  175. _parent.Dispose();
  176. return;
  177. }
  178. _parent._observer.OnNext(res);
  179. }
  180. else
  181. {
  182. if (_other.Done)
  183. {
  184. _parent._observer.OnCompleted();
  185. _parent.Dispose();
  186. return;
  187. }
  188. _queue.Enqueue(value);
  189. }
  190. }
  191. }
  192. public void OnError(Exception error)
  193. {
  194. lock (_parent._gate)
  195. {
  196. _parent._observer.OnError(error);
  197. _parent.Dispose();
  198. }
  199. }
  200. public void OnCompleted()
  201. {
  202. lock (_parent._gate)
  203. {
  204. Done = true;
  205. if (_other.Done)
  206. {
  207. _parent._observer.OnCompleted();
  208. _parent.Dispose();
  209. return;
  210. }
  211. else
  212. {
  213. _self.Dispose();
  214. }
  215. }
  216. }
  217. public void Dispose()
  218. {
  219. _queue.Clear();
  220. }
  221. }
  222. }
  223. class ZipImpl : Sink<TResult>, IObserver<TFirst>
  224. {
  225. private readonly Zip<TFirst, TSecond, TResult> _parent;
  226. public ZipImpl(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  227. : base(observer, cancel)
  228. {
  229. _parent = parent;
  230. }
  231. private IEnumerator<TSecond> _rightEnumerator;
  232. public IDisposable Run()
  233. {
  234. //
  235. // Notice the evaluation order of obtaining the enumerator and subscribing to the
  236. // observable sequence is reversed compared to the operator's signature. This is
  237. // required to make sure the enumerator is available as soon as the observer can
  238. // be called. Otherwise, we end up having a race for the initialization and use
  239. // of the _rightEnumerator field.
  240. //
  241. try
  242. {
  243. _rightEnumerator = _parent._secondE.GetEnumerator();
  244. }
  245. catch (Exception exception)
  246. {
  247. base._observer.OnError(exception);
  248. base.Dispose();
  249. return Disposable.Empty;
  250. }
  251. var leftSubscription = _parent._first.SubscribeSafe(this);
  252. return StableCompositeDisposable.Create(leftSubscription, _rightEnumerator);
  253. }
  254. public void OnNext(TFirst value)
  255. {
  256. var hasNext = false;
  257. try
  258. {
  259. hasNext = _rightEnumerator.MoveNext();
  260. }
  261. catch (Exception ex)
  262. {
  263. base._observer.OnError(ex);
  264. base.Dispose();
  265. return;
  266. }
  267. if (hasNext)
  268. {
  269. var right = default(TSecond);
  270. try
  271. {
  272. right = _rightEnumerator.Current;
  273. }
  274. catch (Exception ex)
  275. {
  276. base._observer.OnError(ex);
  277. base.Dispose();
  278. return;
  279. }
  280. TResult result;
  281. try
  282. {
  283. result = _parent._resultSelector(value, right);
  284. }
  285. catch (Exception ex)
  286. {
  287. base._observer.OnError(ex);
  288. base.Dispose();
  289. return;
  290. }
  291. base._observer.OnNext(result);
  292. }
  293. else
  294. {
  295. base._observer.OnCompleted();
  296. base.Dispose();
  297. }
  298. }
  299. public void OnError(Exception error)
  300. {
  301. base._observer.OnError(error);
  302. base.Dispose();
  303. }
  304. public void OnCompleted()
  305. {
  306. base._observer.OnCompleted();
  307. base.Dispose();
  308. }
  309. }
  310. }
  311. #endregion
  312. #region [3,16]-ary
  313. #region Helpers for n-ary overloads
  314. interface IZip
  315. {
  316. void Next(int index);
  317. void Fail(Exception error);
  318. void Done(int index);
  319. }
  320. abstract class ZipSink<TResult> : Sink<TResult>, IZip
  321. {
  322. protected readonly object _gate;
  323. private readonly ICollection[] _queues;
  324. private readonly bool[] _isDone;
  325. public ZipSink(int arity, IObserver<TResult> observer, IDisposable cancel)
  326. : base(observer, cancel)
  327. {
  328. _gate = new object();
  329. _isDone = new bool[arity];
  330. _queues = new ICollection[arity];
  331. }
  332. public ICollection[] Queues
  333. {
  334. get { return _queues; }
  335. }
  336. public void Next(int index)
  337. {
  338. var hasValueAll = true;
  339. foreach (var queue in _queues)
  340. {
  341. if (queue.Count == 0)
  342. {
  343. hasValueAll = false;
  344. break;
  345. }
  346. }
  347. if (hasValueAll)
  348. {
  349. var res = default(TResult);
  350. try
  351. {
  352. res = GetResult();
  353. }
  354. catch (Exception ex)
  355. {
  356. base._observer.OnError(ex);
  357. base.Dispose();
  358. return;
  359. }
  360. base._observer.OnNext(res);
  361. }
  362. else
  363. {
  364. var allOthersDone = true;
  365. for (int i = 0; i < _isDone.Length; i++)
  366. {
  367. if (i != index && !_isDone[i])
  368. {
  369. allOthersDone = false;
  370. break;
  371. }
  372. }
  373. if (allOthersDone)
  374. {
  375. base._observer.OnCompleted();
  376. base.Dispose();
  377. }
  378. }
  379. }
  380. protected abstract TResult GetResult();
  381. public void Fail(Exception error)
  382. {
  383. base._observer.OnError(error);
  384. base.Dispose();
  385. }
  386. public void Done(int index)
  387. {
  388. _isDone[index] = true;
  389. var allDone = true;
  390. foreach (var isDone in _isDone)
  391. {
  392. if (!isDone)
  393. {
  394. allDone = false;
  395. break;
  396. }
  397. }
  398. if (allDone)
  399. {
  400. base._observer.OnCompleted();
  401. base.Dispose();
  402. return;
  403. }
  404. }
  405. }
  406. class ZipObserver<T> : IObserver<T>
  407. {
  408. private readonly object _gate;
  409. private readonly IZip _parent;
  410. private readonly int _index;
  411. private readonly IDisposable _self;
  412. private readonly Queue<T> _values;
  413. public ZipObserver(object gate, IZip parent, int index, IDisposable self)
  414. {
  415. _gate = gate;
  416. _parent = parent;
  417. _index = index;
  418. _self = self;
  419. _values = new Queue<T>();
  420. }
  421. public Queue<T> Values
  422. {
  423. get { return _values; }
  424. }
  425. public void OnNext(T value)
  426. {
  427. lock (_gate)
  428. {
  429. _values.Enqueue(value);
  430. _parent.Next(_index);
  431. }
  432. }
  433. public void OnError(Exception error)
  434. {
  435. _self.Dispose();
  436. lock (_gate)
  437. {
  438. _parent.Fail(error);
  439. }
  440. }
  441. public void OnCompleted()
  442. {
  443. _self.Dispose();
  444. lock (_gate)
  445. {
  446. _parent.Done(_index);
  447. }
  448. }
  449. }
  450. #endregion
  451. #endregion
  452. #region N-ary
  453. class Zip<TSource> : Producer<IList<TSource>>
  454. {
  455. private readonly IEnumerable<IObservable<TSource>> _sources;
  456. public Zip(IEnumerable<IObservable<TSource>> sources)
  457. {
  458. _sources = sources;
  459. }
  460. protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  461. {
  462. var sink = new _(this, observer, cancel);
  463. setSink(sink);
  464. return sink.Run();
  465. }
  466. class _ : Sink<IList<TSource>>
  467. {
  468. private readonly Zip<TSource> _parent;
  469. public _(Zip<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  470. : base(observer, cancel)
  471. {
  472. _parent = parent;
  473. }
  474. private object _gate;
  475. private Queue<TSource>[] _queues;
  476. private bool[] _isDone;
  477. private IDisposable[] _subscriptions;
  478. public IDisposable Run()
  479. {
  480. var srcs = _parent._sources.ToArray();
  481. var N = srcs.Length;
  482. _queues = new Queue<TSource>[N];
  483. for (int i = 0; i < N; i++)
  484. _queues[i] = new Queue<TSource>();
  485. _isDone = new bool[N];
  486. _subscriptions = new SingleAssignmentDisposable[N];
  487. _gate = new object();
  488. for (int i = 0; i < N; i++)
  489. {
  490. var j = i;
  491. var d = new SingleAssignmentDisposable();
  492. _subscriptions[j] = d;
  493. var o = new O(this, j);
  494. d.Disposable = srcs[j].SubscribeSafe(o);
  495. }
  496. return new CompositeDisposable(_subscriptions) { Disposable.Create(() => { foreach (var q in _queues) q.Clear(); }) };
  497. }
  498. private void OnNext(int index, TSource value)
  499. {
  500. lock (_gate)
  501. {
  502. _queues[index].Enqueue(value);
  503. if (_queues.All(q => q.Count > 0))
  504. {
  505. var res = _queues.Select(q => q.Dequeue()).ToList();
  506. base._observer.OnNext(res);
  507. }
  508. else if (_isDone.Where((x, i) => i != index).All(Stubs<bool>.I))
  509. {
  510. base._observer.OnCompleted();
  511. base.Dispose();
  512. return;
  513. }
  514. }
  515. }
  516. private void OnError(Exception error)
  517. {
  518. lock (_gate)
  519. {
  520. base._observer.OnError(error);
  521. base.Dispose();
  522. }
  523. }
  524. private void OnCompleted(int index)
  525. {
  526. lock (_gate)
  527. {
  528. _isDone[index] = true;
  529. if (_isDone.All(Stubs<bool>.I))
  530. {
  531. base._observer.OnCompleted();
  532. base.Dispose();
  533. return;
  534. }
  535. else
  536. {
  537. _subscriptions[index].Dispose();
  538. }
  539. }
  540. }
  541. class O : IObserver<TSource>
  542. {
  543. private readonly _ _parent;
  544. private readonly int _index;
  545. public O(_ parent, int index)
  546. {
  547. _parent = parent;
  548. _index = index;
  549. }
  550. public void OnNext(TSource value)
  551. {
  552. _parent.OnNext(_index, value);
  553. }
  554. public void OnError(Exception error)
  555. {
  556. _parent.OnError(error);
  557. }
  558. public void OnCompleted()
  559. {
  560. _parent.OnCompleted(_index);
  561. }
  562. }
  563. }
  564. }
  565. #endregion
  566. }
  567. #endif