Buffer.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. namespace System.Reactive.Linq.ObservableImpl
  8. {
  9. internal sealed class Buffer<TSource> : Producer<IList<TSource>>
  10. {
  11. private readonly IObservable<TSource> _source;
  12. private readonly int _count;
  13. private readonly int _skip;
  14. private readonly TimeSpan _timeSpan;
  15. private readonly TimeSpan _timeShift;
  16. private readonly IScheduler _scheduler;
  17. public Buffer(IObservable<TSource> source, int count, int skip)
  18. {
  19. _source = source;
  20. _count = count;
  21. _skip = skip;
  22. }
  23. public Buffer(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  24. {
  25. _source = source;
  26. _timeSpan = timeSpan;
  27. _timeShift = timeShift;
  28. _scheduler = scheduler;
  29. }
  30. public Buffer(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  31. {
  32. _source = source;
  33. _timeSpan = timeSpan;
  34. _count = count;
  35. _scheduler = scheduler;
  36. }
  37. protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  38. {
  39. if (_scheduler == null)
  40. {
  41. var sink = new _(this, observer, cancel);
  42. setSink(sink);
  43. return sink.Run();
  44. }
  45. else if (_count > 0)
  46. {
  47. var sink = new Impl(this, observer, cancel);
  48. setSink(sink);
  49. return sink.Run();
  50. }
  51. else
  52. {
  53. if (_timeSpan == _timeShift)
  54. {
  55. var sink = new BufferTimeShift(this, observer, cancel);
  56. setSink(sink);
  57. return sink.Run();
  58. }
  59. else
  60. {
  61. var sink = new BufferImpl(this, observer, cancel);
  62. setSink(sink);
  63. return sink.Run();
  64. }
  65. }
  66. }
  67. class _ : Sink<IList<TSource>>, IObserver<TSource>
  68. {
  69. private readonly Buffer<TSource> _parent;
  70. public _(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  71. : base(observer, cancel)
  72. {
  73. _parent = parent;
  74. }
  75. private Queue<IList<TSource>> _queue;
  76. private int _n;
  77. public IDisposable Run()
  78. {
  79. _queue = new Queue<IList<TSource>>();
  80. _n = 0;
  81. CreateWindow();
  82. return _parent._source.SubscribeSafe(this);
  83. }
  84. private void CreateWindow()
  85. {
  86. var s = new List<TSource>();
  87. _queue.Enqueue(s);
  88. }
  89. public void OnNext(TSource value)
  90. {
  91. foreach (var s in _queue)
  92. s.Add(value);
  93. var c = _n - _parent._count + 1;
  94. if (c >= 0 && c % _parent._skip == 0)
  95. {
  96. var s = _queue.Dequeue();
  97. if (s.Count > 0)
  98. base._observer.OnNext(s);
  99. }
  100. _n++;
  101. if (_n % _parent._skip == 0)
  102. CreateWindow();
  103. }
  104. public void OnError(Exception error)
  105. {
  106. while (_queue.Count > 0)
  107. _queue.Dequeue().Clear();
  108. base._observer.OnError(error);
  109. base.Dispose();
  110. }
  111. public void OnCompleted()
  112. {
  113. while (_queue.Count > 0)
  114. {
  115. var s = _queue.Dequeue();
  116. if (s.Count > 0)
  117. base._observer.OnNext(s);
  118. }
  119. base._observer.OnCompleted();
  120. base.Dispose();
  121. }
  122. }
  123. class BufferImpl : Sink<IList<TSource>>, IObserver<TSource>
  124. {
  125. private readonly Buffer<TSource> _parent;
  126. public BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  127. : base(observer, cancel)
  128. {
  129. _parent = parent;
  130. }
  131. private TimeSpan _totalTime;
  132. private TimeSpan _nextShift;
  133. private TimeSpan _nextSpan;
  134. private object _gate;
  135. private Queue<List<TSource>> _q;
  136. private SerialDisposable _timerD;
  137. public IDisposable Run()
  138. {
  139. _totalTime = TimeSpan.Zero;
  140. _nextShift = _parent._timeShift;
  141. _nextSpan = _parent._timeSpan;
  142. _gate = new object();
  143. _q = new Queue<List<TSource>>();
  144. _timerD = new SerialDisposable();
  145. CreateWindow();
  146. CreateTimer();
  147. var subscription = _parent._source.SubscribeSafe(this);
  148. return StableCompositeDisposable.Create(_timerD, subscription);
  149. }
  150. private void CreateWindow()
  151. {
  152. var s = new List<TSource>();
  153. _q.Enqueue(s);
  154. }
  155. private void CreateTimer()
  156. {
  157. var m = new SingleAssignmentDisposable();
  158. _timerD.Disposable = m;
  159. var isSpan = false;
  160. var isShift = false;
  161. if (_nextSpan == _nextShift)
  162. {
  163. isSpan = true;
  164. isShift = true;
  165. }
  166. else if (_nextSpan < _nextShift)
  167. isSpan = true;
  168. else
  169. isShift = true;
  170. var newTotalTime = isSpan ? _nextSpan : _nextShift;
  171. var ts = newTotalTime - _totalTime;
  172. _totalTime = newTotalTime;
  173. if (isSpan)
  174. _nextSpan += _parent._timeShift;
  175. if (isShift)
  176. _nextShift += _parent._timeShift;
  177. m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
  178. }
  179. struct State
  180. {
  181. public bool isSpan;
  182. public bool isShift;
  183. }
  184. private IDisposable Tick(IScheduler self, State state)
  185. {
  186. lock (_gate)
  187. {
  188. //
  189. // Before v2, the two operations below were reversed. This doesn't have an observable
  190. // difference for Buffer, but is done to keep code consistent with Window, where we
  191. // took a breaking change in v2 to ensure consistency across overloads. For more info,
  192. // see the comment in Tick for Window.
  193. //
  194. if (state.isSpan)
  195. {
  196. var s = _q.Dequeue();
  197. base._observer.OnNext(s);
  198. }
  199. if (state.isShift)
  200. {
  201. CreateWindow();
  202. }
  203. }
  204. CreateTimer();
  205. return Disposable.Empty;
  206. }
  207. public void OnNext(TSource value)
  208. {
  209. lock (_gate)
  210. {
  211. foreach (var s in _q)
  212. s.Add(value);
  213. }
  214. }
  215. public void OnError(Exception error)
  216. {
  217. lock (_gate)
  218. {
  219. while (_q.Count > 0)
  220. _q.Dequeue().Clear();
  221. base._observer.OnError(error);
  222. base.Dispose();
  223. }
  224. }
  225. public void OnCompleted()
  226. {
  227. lock (_gate)
  228. {
  229. while (_q.Count > 0)
  230. base._observer.OnNext(_q.Dequeue());
  231. base._observer.OnCompleted();
  232. base.Dispose();
  233. }
  234. }
  235. }
  236. class BufferTimeShift : Sink<IList<TSource>>, IObserver<TSource>
  237. {
  238. private readonly Buffer<TSource> _parent;
  239. public BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  240. : base(observer, cancel)
  241. {
  242. _parent = parent;
  243. }
  244. private object _gate;
  245. private List<TSource> _list;
  246. public IDisposable Run()
  247. {
  248. _gate = new object();
  249. _list = new List<TSource>();
  250. var d = _parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick);
  251. var s = _parent._source.SubscribeSafe(this);
  252. return StableCompositeDisposable.Create(d, s);
  253. }
  254. private void Tick()
  255. {
  256. lock (_gate)
  257. {
  258. base._observer.OnNext(_list);
  259. _list = new List<TSource>();
  260. }
  261. }
  262. public void OnNext(TSource value)
  263. {
  264. lock (_gate)
  265. {
  266. _list.Add(value);
  267. }
  268. }
  269. public void OnError(Exception error)
  270. {
  271. lock (_gate)
  272. {
  273. _list.Clear();
  274. base._observer.OnError(error);
  275. base.Dispose();
  276. }
  277. }
  278. public void OnCompleted()
  279. {
  280. lock (_gate)
  281. {
  282. base._observer.OnNext(_list);
  283. base._observer.OnCompleted();
  284. base.Dispose();
  285. }
  286. }
  287. }
  288. class Impl : Sink<IList<TSource>>, IObserver<TSource>
  289. {
  290. private readonly Buffer<TSource> _parent;
  291. public Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  292. : base(observer, cancel)
  293. {
  294. _parent = parent;
  295. }
  296. private object _gate;
  297. private IList<TSource> _s;
  298. private int _n;
  299. private int _windowId;
  300. private SerialDisposable _timerD;
  301. public IDisposable Run()
  302. {
  303. _gate = new object();
  304. _s = default(IList<TSource>);
  305. _n = 0;
  306. _windowId = 0;
  307. _timerD = new SerialDisposable();
  308. _s = new List<TSource>();
  309. CreateTimer(0);
  310. var subscription = _parent._source.SubscribeSafe(this);
  311. return StableCompositeDisposable.Create(_timerD, subscription);
  312. }
  313. private void CreateTimer(int id)
  314. {
  315. var m = new SingleAssignmentDisposable();
  316. _timerD.Disposable = m;
  317. m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
  318. }
  319. private IDisposable Tick(IScheduler self, int id)
  320. {
  321. var d = Disposable.Empty;
  322. var newId = 0;
  323. lock (_gate)
  324. {
  325. if (id != _windowId)
  326. return d;
  327. _n = 0;
  328. newId = ++_windowId;
  329. var res = _s;
  330. _s = new List<TSource>();
  331. base._observer.OnNext(res);
  332. CreateTimer(newId);
  333. }
  334. return d;
  335. }
  336. public void OnNext(TSource value)
  337. {
  338. var newWindow = false;
  339. var newId = 0;
  340. lock (_gate)
  341. {
  342. _s.Add(value);
  343. _n++;
  344. if (_n == _parent._count)
  345. {
  346. newWindow = true;
  347. _n = 0;
  348. newId = ++_windowId;
  349. var res = _s;
  350. _s = new List<TSource>();
  351. base._observer.OnNext(res);
  352. }
  353. if (newWindow)
  354. CreateTimer(newId);
  355. }
  356. }
  357. public void OnError(Exception error)
  358. {
  359. lock (_gate)
  360. {
  361. _s.Clear();
  362. base._observer.OnError(error);
  363. base.Dispose();
  364. }
  365. }
  366. public void OnCompleted()
  367. {
  368. lock (_gate)
  369. {
  370. base._observer.OnNext(_s);
  371. base._observer.OnCompleted();
  372. base.Dispose();
  373. }
  374. }
  375. }
  376. }
  377. internal sealed class Buffer<TSource, TBufferClosing> : Producer<IList<TSource>>
  378. {
  379. private readonly IObservable<TSource> _source;
  380. private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;
  381. private readonly IObservable<TBufferClosing> _bufferBoundaries;
  382. public Buffer(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)
  383. {
  384. _source = source;
  385. _bufferClosingSelector = bufferClosingSelector;
  386. }
  387. public Buffer(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries)
  388. {
  389. _source = source;
  390. _bufferBoundaries = bufferBoundaries;
  391. }
  392. protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  393. {
  394. if (_bufferClosingSelector != null)
  395. {
  396. var sink = new _(this, observer, cancel);
  397. setSink(sink);
  398. return sink.Run();
  399. }
  400. else
  401. {
  402. var sink = new Beta(this, observer, cancel);
  403. setSink(sink);
  404. return sink.Run();
  405. }
  406. }
  407. class _ : Sink<IList<TSource>>, IObserver<TSource>
  408. {
  409. private readonly Buffer<TSource, TBufferClosing> _parent;
  410. public _(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  411. : base(observer, cancel)
  412. {
  413. _parent = parent;
  414. }
  415. private IList<TSource> _buffer;
  416. private object _gate;
  417. private AsyncLock _bufferGate;
  418. private SerialDisposable _m;
  419. public IDisposable Run()
  420. {
  421. _buffer = new List<TSource>();
  422. _gate = new object();
  423. _bufferGate = new AsyncLock();
  424. _m = new SerialDisposable();
  425. var groupDisposable = new CompositeDisposable(2) { _m };
  426. groupDisposable.Add(_parent._source.SubscribeSafe(this));
  427. _bufferGate.Wait(CreateBufferClose);
  428. return groupDisposable;
  429. }
  430. private void CreateBufferClose()
  431. {
  432. var bufferClose = default(IObservable<TBufferClosing>);
  433. try
  434. {
  435. bufferClose = _parent._bufferClosingSelector();
  436. }
  437. catch (Exception exception)
  438. {
  439. lock (_gate)
  440. {
  441. base._observer.OnError(exception);
  442. base.Dispose();
  443. }
  444. return;
  445. }
  446. var closingSubscription = new SingleAssignmentDisposable();
  447. _m.Disposable = closingSubscription;
  448. closingSubscription.Disposable = bufferClose.SubscribeSafe(new Omega(this, closingSubscription));
  449. }
  450. private void CloseBuffer(IDisposable closingSubscription)
  451. {
  452. closingSubscription.Dispose();
  453. lock (_gate)
  454. {
  455. var res = _buffer;
  456. _buffer = new List<TSource>();
  457. base._observer.OnNext(res);
  458. }
  459. _bufferGate.Wait(CreateBufferClose);
  460. }
  461. class Omega : IObserver<TBufferClosing>
  462. {
  463. private readonly _ _parent;
  464. private readonly IDisposable _self;
  465. public Omega(_ parent, IDisposable self)
  466. {
  467. _parent = parent;
  468. _self = self;
  469. }
  470. public void OnNext(TBufferClosing value)
  471. {
  472. _parent.CloseBuffer(_self);
  473. }
  474. public void OnError(Exception error)
  475. {
  476. _parent.OnError(error);
  477. }
  478. public void OnCompleted()
  479. {
  480. _parent.CloseBuffer(_self);
  481. }
  482. }
  483. public void OnNext(TSource value)
  484. {
  485. lock (_gate)
  486. {
  487. _buffer.Add(value);
  488. }
  489. }
  490. public void OnError(Exception error)
  491. {
  492. lock (_gate)
  493. {
  494. _buffer.Clear();
  495. base._observer.OnError(error);
  496. base.Dispose();
  497. }
  498. }
  499. public void OnCompleted()
  500. {
  501. lock (_gate)
  502. {
  503. base._observer.OnNext(_buffer);
  504. base._observer.OnCompleted();
  505. base.Dispose();
  506. }
  507. }
  508. }
  509. class Beta : Sink<IList<TSource>>, IObserver<TSource>
  510. {
  511. private readonly Buffer<TSource, TBufferClosing> _parent;
  512. public Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  513. : base(observer, cancel)
  514. {
  515. _parent = parent;
  516. }
  517. private IList<TSource> _buffer;
  518. private object _gate;
  519. private RefCountDisposable _refCountDisposable;
  520. public IDisposable Run()
  521. {
  522. _buffer = new List<TSource>();
  523. _gate = new object();
  524. var d = new CompositeDisposable(2);
  525. _refCountDisposable = new RefCountDisposable(d);
  526. d.Add(_parent._source.SubscribeSafe(this));
  527. d.Add(_parent._bufferBoundaries.SubscribeSafe(new Omega(this)));
  528. return _refCountDisposable;
  529. }
  530. class Omega : IObserver<TBufferClosing>
  531. {
  532. private readonly Beta _parent;
  533. public Omega(Beta parent)
  534. {
  535. _parent = parent;
  536. }
  537. public void OnNext(TBufferClosing value)
  538. {
  539. lock (_parent._gate)
  540. {
  541. var res = _parent._buffer;
  542. _parent._buffer = new List<TSource>();
  543. _parent._observer.OnNext(res);
  544. }
  545. }
  546. public void OnError(Exception error)
  547. {
  548. _parent.OnError(error);
  549. }
  550. public void OnCompleted()
  551. {
  552. _parent.OnCompleted();
  553. }
  554. }
  555. public void OnNext(TSource value)
  556. {
  557. lock (_gate)
  558. {
  559. _buffer.Add(value);
  560. }
  561. }
  562. public void OnError(Exception error)
  563. {
  564. lock (_gate)
  565. {
  566. _buffer.Clear();
  567. base._observer.OnError(error);
  568. base.Dispose();
  569. }
  570. }
  571. public void OnCompleted()
  572. {
  573. lock (_gate)
  574. {
  575. base._observer.OnNext(_buffer);
  576. base._observer.OnCompleted();
  577. base.Dispose();
  578. }
  579. }
  580. }
  581. }
  582. }