Zip.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Disposables;
  8. using System.Threading;
  9. namespace System.Reactive.Linq.ObservableImpl
  10. {
  11. #region Binary
  12. internal static class Zip<TFirst, TSecond, TResult>
  13. {
  14. internal sealed class Observable : Producer<TResult, Observable._>
  15. {
  16. private readonly IObservable<TFirst> _first;
  17. private readonly IObservable<TSecond> _second;
  18. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  19. public Observable(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  20. {
  21. _first = first;
  22. _second = second;
  23. _resultSelector = resultSelector;
  24. }
  25. protected override _ CreateSink(IObserver<TResult> observer) => new _(_resultSelector, observer);
  26. protected override void Run(_ sink) => sink.Run(_first, _second);
  27. internal sealed class _ : IdentitySink<TResult>
  28. {
  29. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  30. private readonly object _gate;
  31. private readonly FirstObserver _firstObserver;
  32. private IDisposable? _firstDisposable;
  33. private readonly SecondObserver _secondObserver;
  34. private IDisposable? _secondDisposable;
  35. public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
  36. : base(observer)
  37. {
  38. _gate = new object();
  39. _firstObserver = new FirstObserver(this);
  40. _secondObserver = new SecondObserver(this);
  41. _firstObserver.SetOther(_secondObserver);
  42. _secondObserver.SetOther(_firstObserver);
  43. _resultSelector = resultSelector;
  44. }
  45. public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
  46. {
  47. Disposable.SetSingle(ref _firstDisposable, first.SubscribeSafe(_firstObserver));
  48. Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(_secondObserver));
  49. }
  50. protected override void Dispose(bool disposing)
  51. {
  52. if (disposing)
  53. {
  54. Disposable.Dispose(ref _firstDisposable);
  55. Disposable.Dispose(ref _secondDisposable);
  56. // clearing the queue should happen under the lock
  57. // as they are plain Queue<T>s, not concurrent queues.
  58. lock (_gate)
  59. {
  60. _firstObserver.Dispose();
  61. _secondObserver.Dispose();
  62. }
  63. }
  64. base.Dispose(disposing);
  65. }
  66. private sealed class FirstObserver : IObserver<TFirst>, IDisposable
  67. {
  68. private readonly _ _parent;
  69. private readonly Queue<TFirst> _queue;
  70. private SecondObserver _other;
  71. public FirstObserver(_ parent)
  72. {
  73. _parent = parent;
  74. _queue = new Queue<TFirst>();
  75. _other = default!; // NB: Will be set by SetOther.
  76. }
  77. public void SetOther(SecondObserver other) { _other = other; }
  78. public Queue<TFirst> Queue => _queue;
  79. public bool Done { get; private set; }
  80. public void OnNext(TFirst value)
  81. {
  82. lock (_parent._gate)
  83. {
  84. if (_other.Queue.Count > 0)
  85. {
  86. var r = _other.Queue.Dequeue();
  87. TResult res;
  88. try
  89. {
  90. res = _parent._resultSelector(value, r);
  91. }
  92. catch (Exception ex)
  93. {
  94. _parent.ForwardOnError(ex);
  95. return;
  96. }
  97. _parent.ForwardOnNext(res);
  98. }
  99. else
  100. {
  101. if (_other.Done)
  102. {
  103. _parent.ForwardOnCompleted();
  104. return;
  105. }
  106. _queue.Enqueue(value);
  107. }
  108. }
  109. }
  110. public void OnError(Exception error)
  111. {
  112. lock (_parent._gate)
  113. {
  114. _parent.ForwardOnError(error);
  115. }
  116. }
  117. public void OnCompleted()
  118. {
  119. lock (_parent._gate)
  120. {
  121. Done = true;
  122. if (_other.Done)
  123. {
  124. _parent.ForwardOnCompleted();
  125. }
  126. else
  127. {
  128. Disposable.Dispose(ref _parent._firstDisposable);
  129. }
  130. }
  131. }
  132. public void Dispose()
  133. {
  134. _queue.Clear();
  135. }
  136. }
  137. private sealed class SecondObserver : IObserver<TSecond>, IDisposable
  138. {
  139. private readonly _ _parent;
  140. private readonly Queue<TSecond> _queue;
  141. private FirstObserver _other;
  142. public SecondObserver(_ parent)
  143. {
  144. _parent = parent;
  145. _queue = new Queue<TSecond>();
  146. _other = default!; // NB: Will be set by SetOther.
  147. }
  148. public void SetOther(FirstObserver other) { _other = other; }
  149. public Queue<TSecond> Queue => _queue;
  150. public bool Done { get; private set; }
  151. public void OnNext(TSecond value)
  152. {
  153. lock (_parent._gate)
  154. {
  155. if (_other.Queue.Count > 0)
  156. {
  157. var l = _other.Queue.Dequeue();
  158. TResult res;
  159. try
  160. {
  161. res = _parent._resultSelector(l, value);
  162. }
  163. catch (Exception ex)
  164. {
  165. _parent.ForwardOnError(ex);
  166. return;
  167. }
  168. _parent.ForwardOnNext(res);
  169. }
  170. else
  171. {
  172. if (_other.Done)
  173. {
  174. _parent.ForwardOnCompleted();
  175. return;
  176. }
  177. _queue.Enqueue(value);
  178. }
  179. }
  180. }
  181. public void OnError(Exception error)
  182. {
  183. lock (_parent._gate)
  184. {
  185. _parent.ForwardOnError(error);
  186. }
  187. }
  188. public void OnCompleted()
  189. {
  190. lock (_parent._gate)
  191. {
  192. Done = true;
  193. if (_other.Done)
  194. {
  195. _parent.ForwardOnCompleted();
  196. }
  197. else
  198. {
  199. Disposable.Dispose(ref _parent._secondDisposable);
  200. }
  201. }
  202. }
  203. public void Dispose()
  204. {
  205. _queue.Clear();
  206. }
  207. }
  208. }
  209. }
  210. internal sealed class Enumerable : Producer<TResult, Enumerable._>
  211. {
  212. private readonly IObservable<TFirst> _first;
  213. private readonly IEnumerable<TSecond> _second;
  214. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  215. public Enumerable(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  216. {
  217. _first = first;
  218. _second = second;
  219. _resultSelector = resultSelector;
  220. }
  221. protected override _ CreateSink(IObserver<TResult> observer) => new _(_resultSelector, observer);
  222. protected override void Run(_ sink) => sink.Run(_first, _second);
  223. internal sealed class _ : Sink<TFirst, TResult>
  224. {
  225. private readonly Func<TFirst, TSecond, TResult> _resultSelector;
  226. public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
  227. : base(observer)
  228. {
  229. _resultSelector = resultSelector;
  230. }
  231. int _enumerationInProgress;
  232. private IEnumerator<TSecond>? _rightEnumerator;
  233. private static readonly IEnumerator<TSecond> DisposedEnumerator = MakeDisposedEnumerator();
  234. private static IEnumerator<TSecond> MakeDisposedEnumerator()
  235. {
  236. yield break;
  237. }
  238. public void Run(IObservable<TFirst> first, IEnumerable<TSecond> second)
  239. {
  240. //
  241. // Notice the evaluation order of obtaining the enumerator and subscribing to the
  242. // observable sequence is reversed compared to the operator's signature. This is
  243. // required to make sure the enumerator is available as soon as the observer can
  244. // be called. Otherwise, we end up having a race for the initialization and use
  245. // of the _rightEnumerator field.
  246. //
  247. try
  248. {
  249. var enumerator = second.GetEnumerator();
  250. if (Interlocked.CompareExchange(ref _rightEnumerator, enumerator, null) != null)
  251. {
  252. enumerator.Dispose();
  253. return;
  254. }
  255. }
  256. catch (Exception exception)
  257. {
  258. ForwardOnError(exception);
  259. return;
  260. }
  261. Run(first);
  262. }
  263. protected override void Dispose(bool disposing)
  264. {
  265. if (disposing)
  266. {
  267. if (Interlocked.Increment(ref _enumerationInProgress) == 1)
  268. {
  269. Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
  270. }
  271. }
  272. base.Dispose(disposing);
  273. }
  274. public override void OnNext(TFirst value)
  275. {
  276. var currentEnumerator = Volatile.Read(ref _rightEnumerator);
  277. if (currentEnumerator == DisposedEnumerator)
  278. {
  279. return;
  280. }
  281. if (Interlocked.Increment(ref _enumerationInProgress) != 1)
  282. {
  283. return;
  284. }
  285. bool hasNext;
  286. TSecond right = default;
  287. var wasDisposed = false;
  288. try
  289. {
  290. try
  291. {
  292. hasNext = currentEnumerator!.MoveNext();
  293. if (hasNext)
  294. {
  295. right = currentEnumerator.Current;
  296. }
  297. }
  298. finally
  299. {
  300. if (Interlocked.Decrement(ref _enumerationInProgress) != 0)
  301. {
  302. Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
  303. wasDisposed = true;
  304. }
  305. }
  306. }
  307. catch (Exception ex)
  308. {
  309. ForwardOnError(ex);
  310. return;
  311. }
  312. if (wasDisposed)
  313. {
  314. return;
  315. }
  316. if (hasNext)
  317. {
  318. TResult result;
  319. try
  320. {
  321. result = _resultSelector(value, right!); // NB: Not null when hasNext is true.
  322. }
  323. catch (Exception ex)
  324. {
  325. ForwardOnError(ex);
  326. return;
  327. }
  328. ForwardOnNext(result);
  329. }
  330. else
  331. {
  332. ForwardOnCompleted();
  333. }
  334. }
  335. }
  336. }
  337. }
  338. #endregion
  339. #region [3,16]-ary
  340. #region Helpers for n-ary overloads
  341. internal interface IZip
  342. {
  343. void Next(int index);
  344. void Fail(Exception error);
  345. void Done(int index);
  346. }
  347. internal abstract class ZipSink<TResult> : IdentitySink<TResult>, IZip
  348. {
  349. protected readonly object _gate;
  350. private readonly ICollection[] _queues;
  351. private readonly bool[] _isDone;
  352. protected ZipSink(int arity, IObserver<TResult> observer)
  353. : base(observer)
  354. {
  355. _gate = new object();
  356. _isDone = new bool[arity];
  357. _queues = new ICollection[arity];
  358. }
  359. public ICollection[] Queues => _queues;
  360. public void Next(int index)
  361. {
  362. var hasValueAll = true;
  363. foreach (var queue in _queues)
  364. {
  365. if (queue.Count == 0)
  366. {
  367. hasValueAll = false;
  368. break;
  369. }
  370. }
  371. if (hasValueAll)
  372. {
  373. TResult res;
  374. try
  375. {
  376. res = GetResult();
  377. }
  378. catch (Exception ex)
  379. {
  380. ForwardOnError(ex);
  381. return;
  382. }
  383. ForwardOnNext(res);
  384. }
  385. else
  386. {
  387. var allOthersDone = true;
  388. for (var i = 0; i < _isDone.Length; i++)
  389. {
  390. if (i != index && !_isDone[i])
  391. {
  392. allOthersDone = false;
  393. break;
  394. }
  395. }
  396. if (allOthersDone)
  397. {
  398. ForwardOnCompleted();
  399. }
  400. }
  401. }
  402. protected abstract TResult GetResult();
  403. public void Fail(Exception error)
  404. {
  405. ForwardOnError(error);
  406. }
  407. public void Done(int index)
  408. {
  409. _isDone[index] = true;
  410. var allDone = true;
  411. foreach (var isDone in _isDone)
  412. {
  413. if (!isDone)
  414. {
  415. allDone = false;
  416. break;
  417. }
  418. }
  419. if (allDone)
  420. {
  421. ForwardOnCompleted();
  422. }
  423. }
  424. }
  425. internal sealed class ZipObserver<T> : SafeObserver<T>
  426. {
  427. private readonly object _gate;
  428. private readonly IZip _parent;
  429. private readonly int _index;
  430. private readonly Queue<T> _values;
  431. public ZipObserver(object gate, IZip parent, int index)
  432. {
  433. _gate = gate;
  434. _parent = parent;
  435. _index = index;
  436. _values = new Queue<T>();
  437. }
  438. public Queue<T> Values => _values;
  439. protected override void Dispose(bool disposing)
  440. {
  441. base.Dispose(disposing);
  442. if (disposing)
  443. {
  444. lock (_gate)
  445. {
  446. _values.Clear();
  447. }
  448. }
  449. }
  450. public override void OnNext(T value)
  451. {
  452. lock (_gate)
  453. {
  454. _values.Enqueue(value);
  455. _parent.Next(_index);
  456. }
  457. }
  458. public override void OnError(Exception error)
  459. {
  460. Dispose();
  461. lock (_gate)
  462. {
  463. _parent.Fail(error);
  464. }
  465. }
  466. public override void OnCompleted()
  467. {
  468. // Calling Dispose() here would clear the queue prematurely.
  469. // We only need to dispose the IDisposable of the upstream,
  470. // which is done by SafeObserver.Dispose(bool).
  471. base.Dispose(true);
  472. lock (_gate)
  473. {
  474. _parent.Done(_index);
  475. }
  476. }
  477. }
  478. #endregion
  479. #endregion
  480. #region N-ary
  481. internal sealed class Zip<TSource> : Producer<IList<TSource>, Zip<TSource>._>
  482. {
  483. private readonly IEnumerable<IObservable<TSource>> _sources;
  484. public Zip(IEnumerable<IObservable<TSource>> sources)
  485. {
  486. _sources = sources;
  487. }
  488. protected override _ CreateSink(IObserver<IList<TSource>> observer) => new _(observer);
  489. protected override void Run(_ sink) => sink.Run(_sources);
  490. internal sealed class _ : IdentitySink<IList<TSource>>
  491. {
  492. private readonly object _gate;
  493. public _(IObserver<IList<TSource>> observer)
  494. : base(observer)
  495. {
  496. _gate = new object();
  497. // NB: These will be set in Run before getting used.
  498. _queues = null!;
  499. _isDone = null!;
  500. }
  501. private Queue<TSource>[] _queues;
  502. private bool[] _isDone;
  503. private IDisposable[]? _subscriptions;
  504. public void Run(IEnumerable<IObservable<TSource>> sources)
  505. {
  506. var srcs = sources.ToArray();
  507. var N = srcs.Length;
  508. _queues = new Queue<TSource>[N];
  509. for (var i = 0; i < N; i++)
  510. {
  511. _queues[i] = new Queue<TSource>();
  512. }
  513. _isDone = new bool[N];
  514. var subscriptions = new IDisposable[N];
  515. if (Interlocked.CompareExchange(ref _subscriptions, subscriptions, null) == null)
  516. {
  517. for (var i = 0; i < N; i++)
  518. {
  519. var o = new SourceObserver(this, i);
  520. Disposable.SetSingle(ref subscriptions[i], srcs[i].SubscribeSafe(o));
  521. }
  522. }
  523. }
  524. protected override void Dispose(bool disposing)
  525. {
  526. if (disposing)
  527. {
  528. var subscriptions = Interlocked.Exchange(ref _subscriptions, Array.Empty<IDisposable>());
  529. if (subscriptions != null && subscriptions != Array.Empty<IDisposable>())
  530. {
  531. for (var i = 0; i < subscriptions.Length; i++)
  532. {
  533. Disposable.Dispose(ref subscriptions[i]);
  534. }
  535. lock (_gate)
  536. {
  537. foreach (var q in _queues)
  538. {
  539. q.Clear();
  540. }
  541. }
  542. }
  543. }
  544. base.Dispose(disposing);
  545. }
  546. private void OnNext(int index, TSource value)
  547. {
  548. lock (_gate)
  549. {
  550. _queues[index].Enqueue(value);
  551. if (_queues.All(q => q.Count > 0))
  552. {
  553. var n = _queues.Length;
  554. var res = new List<TSource>(n);
  555. for (var i = 0; i < n; i++)
  556. {
  557. res.Add(_queues[i].Dequeue());
  558. }
  559. ForwardOnNext(res);
  560. }
  561. else if (_isDone.AllExcept(index))
  562. {
  563. ForwardOnCompleted();
  564. }
  565. }
  566. }
  567. private new void OnError(Exception error)
  568. {
  569. lock (_gate)
  570. {
  571. ForwardOnError(error);
  572. }
  573. }
  574. private void OnCompleted(int index)
  575. {
  576. lock (_gate)
  577. {
  578. _isDone[index] = true;
  579. if (_isDone.All())
  580. {
  581. ForwardOnCompleted();
  582. }
  583. else
  584. {
  585. var subscriptions = Volatile.Read(ref _subscriptions);
  586. if (subscriptions != null && subscriptions != Array.Empty<IDisposable>())
  587. {
  588. Disposable.Dispose(ref subscriptions[index]);
  589. }
  590. }
  591. }
  592. }
  593. private sealed class SourceObserver : IObserver<TSource>
  594. {
  595. private readonly _ _parent;
  596. private readonly int _index;
  597. public SourceObserver(_ parent, int index)
  598. {
  599. _parent = parent;
  600. _index = index;
  601. }
  602. public void OnNext(TSource value)
  603. {
  604. _parent.OnNext(_index, value);
  605. }
  606. public void OnError(Exception error)
  607. {
  608. _parent.OnError(error);
  609. }
  610. public void OnCompleted()
  611. {
  612. _parent.OnCompleted(_index);
  613. }
  614. }
  615. }
  616. }
  617. #endregion
  618. }