Zip.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Disposables;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. #region Binary
  11. class Zip<TFirst, TSecond, TResult> : Producer<TResult>
  12. {
  13. private readonly IObservable<TFirst> _first;
  14. private readonly IObservable<TSecond> _second;
  15. private readonly IEnumerable<TSecond> _secondE;
  16. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  17. public Zip(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  18. {
  19. _first = first;
  20. _second = second;
  21. _resultSelector = resultSelector;
  22. }
  23. public Zip(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  24. {
  25. _first = first;
  26. _secondE = second;
  27. _resultSelector = resultSelector;
  28. }
  29. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  30. {
  31. if (_second != null)
  32. {
  33. var sink = new _(this, observer, cancel);
  34. setSink(sink);
  35. return sink.Run();
  36. }
  37. else
  38. {
  39. var sink = new ZipImpl(this, observer, cancel);
  40. setSink(sink);
  41. return sink.Run();
  42. }
  43. }
  44. class _ : Sink<TResult>
  45. {
  46. private readonly Zip<TFirst, TSecond, TResult> _parent;
  47. public _(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  48. : base(observer, cancel)
  49. {
  50. _parent = parent;
  51. }
  52. private object _gate;
  53. public IDisposable Run()
  54. {
  55. _gate = new object();
  56. var fstSubscription = new SingleAssignmentDisposable();
  57. var sndSubscription = new SingleAssignmentDisposable();
  58. var fstO = new F(this, fstSubscription);
  59. var sndO = new S(this, sndSubscription);
  60. fstO.Other = sndO;
  61. sndO.Other = fstO;
  62. fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
  63. sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
  64. return StableCompositeDisposable.Create(fstSubscription, sndSubscription, fstO, sndO);
  65. }
  66. class F : IObserver<TFirst>, IDisposable
  67. {
  68. private readonly _ _parent;
  69. private readonly IDisposable _self;
  70. private S _other;
  71. private Queue<TFirst> _queue;
  72. public F(_ parent, IDisposable self)
  73. {
  74. _parent = parent;
  75. _self = self;
  76. _queue = new Queue<TFirst>();
  77. }
  78. public S Other { set { _other = value; } }
  79. public Queue<TFirst> Queue { get { return _queue; } }
  80. public bool Done { get; private set; }
  81. public void OnNext(TFirst value)
  82. {
  83. lock (_parent._gate)
  84. {
  85. if (_other.Queue.Count > 0)
  86. {
  87. var r = _other.Queue.Dequeue();
  88. var res = default(TResult);
  89. try
  90. {
  91. res = _parent._parent._resultSelector(value, r);
  92. }
  93. catch (Exception ex)
  94. {
  95. _parent._observer.OnError(ex);
  96. _parent.Dispose();
  97. return;
  98. }
  99. _parent._observer.OnNext(res);
  100. }
  101. else
  102. {
  103. if (_other.Done)
  104. {
  105. _parent._observer.OnCompleted();
  106. _parent.Dispose();
  107. return;
  108. }
  109. _queue.Enqueue(value);
  110. }
  111. }
  112. }
  113. public void OnError(Exception error)
  114. {
  115. lock (_parent._gate)
  116. {
  117. _parent._observer.OnError(error);
  118. _parent.Dispose();
  119. }
  120. }
  121. public void OnCompleted()
  122. {
  123. lock (_parent._gate)
  124. {
  125. Done = true;
  126. if (_other.Done)
  127. {
  128. _parent._observer.OnCompleted();
  129. _parent.Dispose();
  130. return;
  131. }
  132. else
  133. {
  134. _self.Dispose();
  135. }
  136. }
  137. }
  138. public void Dispose()
  139. {
  140. _queue.Clear();
  141. }
  142. }
  143. class S : IObserver<TSecond>, IDisposable
  144. {
  145. private readonly _ _parent;
  146. private readonly IDisposable _self;
  147. private F _other;
  148. private Queue<TSecond> _queue;
  149. public S(_ parent, IDisposable self)
  150. {
  151. _parent = parent;
  152. _self = self;
  153. _queue = new Queue<TSecond>();
  154. }
  155. public F Other { set { _other = value; } }
  156. public Queue<TSecond> Queue { get { return _queue; } }
  157. public bool Done { get; private set; }
  158. public void OnNext(TSecond value)
  159. {
  160. lock (_parent._gate)
  161. {
  162. if (_other.Queue.Count > 0)
  163. {
  164. var l = _other.Queue.Dequeue();
  165. var res = default(TResult);
  166. try
  167. {
  168. res = _parent._parent._resultSelector(l, value);
  169. }
  170. catch (Exception ex)
  171. {
  172. _parent._observer.OnError(ex);
  173. _parent.Dispose();
  174. return;
  175. }
  176. _parent._observer.OnNext(res);
  177. }
  178. else
  179. {
  180. if (_other.Done)
  181. {
  182. _parent._observer.OnCompleted();
  183. _parent.Dispose();
  184. return;
  185. }
  186. _queue.Enqueue(value);
  187. }
  188. }
  189. }
  190. public void OnError(Exception error)
  191. {
  192. lock (_parent._gate)
  193. {
  194. _parent._observer.OnError(error);
  195. _parent.Dispose();
  196. }
  197. }
  198. public void OnCompleted()
  199. {
  200. lock (_parent._gate)
  201. {
  202. Done = true;
  203. if (_other.Done)
  204. {
  205. _parent._observer.OnCompleted();
  206. _parent.Dispose();
  207. return;
  208. }
  209. else
  210. {
  211. _self.Dispose();
  212. }
  213. }
  214. }
  215. public void Dispose()
  216. {
  217. _queue.Clear();
  218. }
  219. }
  220. }
  221. class ZipImpl : Sink<TResult>, IObserver<TFirst>
  222. {
  223. private readonly Zip<TFirst, TSecond, TResult> _parent;
  224. public ZipImpl(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  225. : base(observer, cancel)
  226. {
  227. _parent = parent;
  228. }
  229. private IEnumerator<TSecond> _rightEnumerator;
  230. public IDisposable Run()
  231. {
  232. //
  233. // Notice the evaluation order of obtaining the enumerator and subscribing to the
  234. // observable sequence is reversed compared to the operator's signature. This is
  235. // required to make sure the enumerator is available as soon as the observer can
  236. // be called. Otherwise, we end up having a race for the initialization and use
  237. // of the _rightEnumerator field.
  238. //
  239. try
  240. {
  241. _rightEnumerator = _parent._secondE.GetEnumerator();
  242. }
  243. catch (Exception exception)
  244. {
  245. base._observer.OnError(exception);
  246. base.Dispose();
  247. return Disposable.Empty;
  248. }
  249. var leftSubscription = _parent._first.SubscribeSafe(this);
  250. return StableCompositeDisposable.Create(leftSubscription, _rightEnumerator);
  251. }
  252. public void OnNext(TFirst value)
  253. {
  254. var hasNext = false;
  255. try
  256. {
  257. hasNext = _rightEnumerator.MoveNext();
  258. }
  259. catch (Exception ex)
  260. {
  261. base._observer.OnError(ex);
  262. base.Dispose();
  263. return;
  264. }
  265. if (hasNext)
  266. {
  267. var right = default(TSecond);
  268. try
  269. {
  270. right = _rightEnumerator.Current;
  271. }
  272. catch (Exception ex)
  273. {
  274. base._observer.OnError(ex);
  275. base.Dispose();
  276. return;
  277. }
  278. TResult result;
  279. try
  280. {
  281. result = _parent._resultSelector(value, right);
  282. }
  283. catch (Exception ex)
  284. {
  285. base._observer.OnError(ex);
  286. base.Dispose();
  287. return;
  288. }
  289. base._observer.OnNext(result);
  290. }
  291. else
  292. {
  293. base._observer.OnCompleted();
  294. base.Dispose();
  295. }
  296. }
  297. public void OnError(Exception error)
  298. {
  299. base._observer.OnError(error);
  300. base.Dispose();
  301. }
  302. public void OnCompleted()
  303. {
  304. base._observer.OnCompleted();
  305. base.Dispose();
  306. }
  307. }
  308. }
  309. #endregion
  310. #region [3,16]-ary
  311. #region Helpers for n-ary overloads
  312. interface IZip
  313. {
  314. void Next(int index);
  315. void Fail(Exception error);
  316. void Done(int index);
  317. }
  318. abstract class ZipSink<TResult> : Sink<TResult>, IZip
  319. {
  320. protected readonly object _gate;
  321. private readonly ICollection[] _queues;
  322. private readonly bool[] _isDone;
  323. public ZipSink(int arity, IObserver<TResult> observer, IDisposable cancel)
  324. : base(observer, cancel)
  325. {
  326. _gate = new object();
  327. _isDone = new bool[arity];
  328. _queues = new ICollection[arity];
  329. }
  330. public ICollection[] Queues
  331. {
  332. get { return _queues; }
  333. }
  334. public void Next(int index)
  335. {
  336. var hasValueAll = true;
  337. foreach (var queue in _queues)
  338. {
  339. if (queue.Count == 0)
  340. {
  341. hasValueAll = false;
  342. break;
  343. }
  344. }
  345. if (hasValueAll)
  346. {
  347. var res = default(TResult);
  348. try
  349. {
  350. res = GetResult();
  351. }
  352. catch (Exception ex)
  353. {
  354. base._observer.OnError(ex);
  355. base.Dispose();
  356. return;
  357. }
  358. base._observer.OnNext(res);
  359. }
  360. else
  361. {
  362. var allOthersDone = true;
  363. for (int i = 0; i < _isDone.Length; i++)
  364. {
  365. if (i != index && !_isDone[i])
  366. {
  367. allOthersDone = false;
  368. break;
  369. }
  370. }
  371. if (allOthersDone)
  372. {
  373. base._observer.OnCompleted();
  374. base.Dispose();
  375. }
  376. }
  377. }
  378. protected abstract TResult GetResult();
  379. public void Fail(Exception error)
  380. {
  381. base._observer.OnError(error);
  382. base.Dispose();
  383. }
  384. public void Done(int index)
  385. {
  386. _isDone[index] = true;
  387. var allDone = true;
  388. foreach (var isDone in _isDone)
  389. {
  390. if (!isDone)
  391. {
  392. allDone = false;
  393. break;
  394. }
  395. }
  396. if (allDone)
  397. {
  398. base._observer.OnCompleted();
  399. base.Dispose();
  400. return;
  401. }
  402. }
  403. }
  404. class ZipObserver<T> : IObserver<T>
  405. {
  406. private readonly object _gate;
  407. private readonly IZip _parent;
  408. private readonly int _index;
  409. private readonly IDisposable _self;
  410. private readonly Queue<T> _values;
  411. public ZipObserver(object gate, IZip parent, int index, IDisposable self)
  412. {
  413. _gate = gate;
  414. _parent = parent;
  415. _index = index;
  416. _self = self;
  417. _values = new Queue<T>();
  418. }
  419. public Queue<T> Values
  420. {
  421. get { return _values; }
  422. }
  423. public void OnNext(T value)
  424. {
  425. lock (_gate)
  426. {
  427. _values.Enqueue(value);
  428. _parent.Next(_index);
  429. }
  430. }
  431. public void OnError(Exception error)
  432. {
  433. _self.Dispose();
  434. lock (_gate)
  435. {
  436. _parent.Fail(error);
  437. }
  438. }
  439. public void OnCompleted()
  440. {
  441. _self.Dispose();
  442. lock (_gate)
  443. {
  444. _parent.Done(_index);
  445. }
  446. }
  447. }
  448. #endregion
  449. #endregion
  450. #region N-ary
  451. class Zip<TSource> : Producer<IList<TSource>>
  452. {
  453. private readonly IEnumerable<IObservable<TSource>> _sources;
  454. public Zip(IEnumerable<IObservable<TSource>> sources)
  455. {
  456. _sources = sources;
  457. }
  458. protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  459. {
  460. var sink = new _(this, observer, cancel);
  461. setSink(sink);
  462. return sink.Run();
  463. }
  464. class _ : Sink<IList<TSource>>
  465. {
  466. private readonly Zip<TSource> _parent;
  467. public _(Zip<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  468. : base(observer, cancel)
  469. {
  470. _parent = parent;
  471. }
  472. private object _gate;
  473. private Queue<TSource>[] _queues;
  474. private bool[] _isDone;
  475. private IDisposable[] _subscriptions;
  476. public IDisposable Run()
  477. {
  478. var srcs = _parent._sources.ToArray();
  479. var N = srcs.Length;
  480. _queues = new Queue<TSource>[N];
  481. for (int i = 0; i < N; i++)
  482. _queues[i] = new Queue<TSource>();
  483. _isDone = new bool[N];
  484. _subscriptions = new SingleAssignmentDisposable[N];
  485. _gate = new object();
  486. for (int i = 0; i < N; i++)
  487. {
  488. var j = i;
  489. var d = new SingleAssignmentDisposable();
  490. _subscriptions[j] = d;
  491. var o = new O(this, j);
  492. d.Disposable = srcs[j].SubscribeSafe(o);
  493. }
  494. return new CompositeDisposable(_subscriptions) { Disposable.Create(() => { foreach (var q in _queues) q.Clear(); }) };
  495. }
  496. private void OnNext(int index, TSource value)
  497. {
  498. lock (_gate)
  499. {
  500. _queues[index].Enqueue(value);
  501. if (_queues.All(q => q.Count > 0))
  502. {
  503. var res = _queues.Select(q => q.Dequeue()).ToList();
  504. base._observer.OnNext(res);
  505. }
  506. else if (_isDone.Where((x, i) => i != index).All(Stubs<bool>.I))
  507. {
  508. base._observer.OnCompleted();
  509. base.Dispose();
  510. return;
  511. }
  512. }
  513. }
  514. private void OnError(Exception error)
  515. {
  516. lock (_gate)
  517. {
  518. base._observer.OnError(error);
  519. base.Dispose();
  520. }
  521. }
  522. private void OnCompleted(int index)
  523. {
  524. lock (_gate)
  525. {
  526. _isDone[index] = true;
  527. if (_isDone.All(Stubs<bool>.I))
  528. {
  529. base._observer.OnCompleted();
  530. base.Dispose();
  531. return;
  532. }
  533. else
  534. {
  535. _subscriptions[index].Dispose();
  536. }
  537. }
  538. }
  539. class O : IObserver<TSource>
  540. {
  541. private readonly _ _parent;
  542. private readonly int _index;
  543. public O(_ parent, int index)
  544. {
  545. _parent = parent;
  546. _index = index;
  547. }
  548. public void OnNext(TSource value)
  549. {
  550. _parent.OnNext(_index, value);
  551. }
  552. public void OnError(Exception error)
  553. {
  554. _parent.OnError(error);
  555. }
  556. public void OnCompleted()
  557. {
  558. _parent.OnCompleted(_index);
  559. }
  560. }
  561. }
  562. }
  563. #endregion
  564. }
  565. #endif