1
0

Buffer.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Disposables;
  10. using System.Threading;
  11. namespace System.Reactive.Linq.ObservableImpl
  12. {
  13. class Buffer<TSource> : Producer<IList<TSource>>
  14. {
  15. private readonly IObservable<TSource> _source;
  16. private readonly int _count;
  17. private readonly int _skip;
  18. private readonly TimeSpan _timeSpan;
  19. private readonly TimeSpan _timeShift;
  20. private readonly IScheduler _scheduler;
  21. public Buffer(IObservable<TSource> source, int count, int skip)
  22. {
  23. _source = source;
  24. _count = count;
  25. _skip = skip;
  26. }
  27. public Buffer(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  28. {
  29. _source = source;
  30. _timeSpan = timeSpan;
  31. _timeShift = timeShift;
  32. _scheduler = scheduler;
  33. }
  34. public Buffer(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  35. {
  36. _source = source;
  37. _timeSpan = timeSpan;
  38. _count = count;
  39. _scheduler = scheduler;
  40. }
  41. protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  42. {
  43. if (_scheduler == null)
  44. {
  45. var sink = new _(this, observer, cancel);
  46. setSink(sink);
  47. return sink.Run();
  48. }
  49. else if (_count > 0)
  50. {
  51. var sink = new Impl(this, observer, cancel);
  52. setSink(sink);
  53. return sink.Run();
  54. }
  55. else
  56. {
  57. if (_timeSpan == _timeShift)
  58. {
  59. var sink = new BufferTimeShift(this, observer, cancel);
  60. setSink(sink);
  61. return sink.Run();
  62. }
  63. else
  64. {
  65. var sink = new BufferImpl(this, observer, cancel);
  66. setSink(sink);
  67. return sink.Run();
  68. }
  69. }
  70. }
  71. class _ : Sink<IList<TSource>>, IObserver<TSource>
  72. {
  73. private readonly Buffer<TSource> _parent;
  74. public _(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  75. : base(observer, cancel)
  76. {
  77. _parent = parent;
  78. }
  79. private Queue<IList<TSource>> _queue;
  80. private int _n;
  81. public IDisposable Run()
  82. {
  83. _queue = new Queue<IList<TSource>>();
  84. _n = 0;
  85. CreateWindow();
  86. return _parent._source.SubscribeSafe(this);
  87. }
  88. private void CreateWindow()
  89. {
  90. var s = new List<TSource>();
  91. _queue.Enqueue(s);
  92. }
  93. public void OnNext(TSource value)
  94. {
  95. foreach (var s in _queue)
  96. s.Add(value);
  97. var c = _n - _parent._count + 1;
  98. if (c >= 0 && c % _parent._skip == 0)
  99. {
  100. var s = _queue.Dequeue();
  101. if (s.Count > 0)
  102. base._observer.OnNext(s);
  103. }
  104. _n++;
  105. if (_n % _parent._skip == 0)
  106. CreateWindow();
  107. }
  108. public void OnError(Exception error)
  109. {
  110. while (_queue.Count > 0)
  111. _queue.Dequeue().Clear();
  112. base._observer.OnError(error);
  113. base.Dispose();
  114. }
  115. public void OnCompleted()
  116. {
  117. while (_queue.Count > 0)
  118. {
  119. var s = _queue.Dequeue();
  120. if (s.Count > 0)
  121. base._observer.OnNext(s);
  122. }
  123. base._observer.OnCompleted();
  124. base.Dispose();
  125. }
  126. }
  127. class BufferImpl : Sink<IList<TSource>>, IObserver<TSource>
  128. {
  129. private readonly Buffer<TSource> _parent;
  130. public BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  131. : base(observer, cancel)
  132. {
  133. _parent = parent;
  134. }
  135. private TimeSpan _totalTime;
  136. private TimeSpan _nextShift;
  137. private TimeSpan _nextSpan;
  138. private object _gate;
  139. private Queue<List<TSource>> _q;
  140. private SerialDisposable _timerD;
  141. public IDisposable Run()
  142. {
  143. _totalTime = TimeSpan.Zero;
  144. _nextShift = _parent._timeShift;
  145. _nextSpan = _parent._timeSpan;
  146. _gate = new object();
  147. _q = new Queue<List<TSource>>();
  148. _timerD = new SerialDisposable();
  149. CreateWindow();
  150. CreateTimer();
  151. var subscription = _parent._source.SubscribeSafe(this);
  152. return StableCompositeDisposable.Create(_timerD, subscription);
  153. }
  154. private void CreateWindow()
  155. {
  156. var s = new List<TSource>();
  157. _q.Enqueue(s);
  158. }
  159. private void CreateTimer()
  160. {
  161. var m = new SingleAssignmentDisposable();
  162. _timerD.Disposable = m;
  163. var isSpan = false;
  164. var isShift = false;
  165. if (_nextSpan == _nextShift)
  166. {
  167. isSpan = true;
  168. isShift = true;
  169. }
  170. else if (_nextSpan < _nextShift)
  171. isSpan = true;
  172. else
  173. isShift = true;
  174. var newTotalTime = isSpan ? _nextSpan : _nextShift;
  175. var ts = newTotalTime - _totalTime;
  176. _totalTime = newTotalTime;
  177. if (isSpan)
  178. _nextSpan += _parent._timeShift;
  179. if (isShift)
  180. _nextShift += _parent._timeShift;
  181. m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
  182. }
  183. struct State
  184. {
  185. public bool isSpan;
  186. public bool isShift;
  187. }
  188. private IDisposable Tick(IScheduler self, State state)
  189. {
  190. lock (_gate)
  191. {
  192. //
  193. // Before v2, the two operations below were reversed. This doesn't have an observable
  194. // difference for Buffer, but is done to keep code consistent with Window, where we
  195. // took a breaking change in v2 to ensure consistency across overloads. For more info,
  196. // see the comment in Tick for Window.
  197. //
  198. if (state.isSpan)
  199. {
  200. var s = _q.Dequeue();
  201. base._observer.OnNext(s);
  202. }
  203. if (state.isShift)
  204. {
  205. CreateWindow();
  206. }
  207. }
  208. CreateTimer();
  209. return Disposable.Empty;
  210. }
  211. public void OnNext(TSource value)
  212. {
  213. lock (_gate)
  214. {
  215. foreach (var s in _q)
  216. s.Add(value);
  217. }
  218. }
  219. public void OnError(Exception error)
  220. {
  221. lock (_gate)
  222. {
  223. while (_q.Count > 0)
  224. _q.Dequeue().Clear();
  225. base._observer.OnError(error);
  226. base.Dispose();
  227. }
  228. }
  229. public void OnCompleted()
  230. {
  231. lock (_gate)
  232. {
  233. while (_q.Count > 0)
  234. base._observer.OnNext(_q.Dequeue());
  235. base._observer.OnCompleted();
  236. base.Dispose();
  237. }
  238. }
  239. }
  240. class BufferTimeShift : Sink<IList<TSource>>, IObserver<TSource>
  241. {
  242. private readonly Buffer<TSource> _parent;
  243. public BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  244. : base(observer, cancel)
  245. {
  246. _parent = parent;
  247. }
  248. private object _gate;
  249. private List<TSource> _list;
  250. public IDisposable Run()
  251. {
  252. _gate = new object();
  253. _list = new List<TSource>();
  254. var d = _parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick);
  255. var s = _parent._source.SubscribeSafe(this);
  256. return StableCompositeDisposable.Create(d, s);
  257. }
  258. private void Tick()
  259. {
  260. lock (_gate)
  261. {
  262. base._observer.OnNext(_list);
  263. _list = new List<TSource>();
  264. }
  265. }
  266. public void OnNext(TSource value)
  267. {
  268. lock (_gate)
  269. {
  270. _list.Add(value);
  271. }
  272. }
  273. public void OnError(Exception error)
  274. {
  275. lock (_gate)
  276. {
  277. _list.Clear();
  278. base._observer.OnError(error);
  279. base.Dispose();
  280. }
  281. }
  282. public void OnCompleted()
  283. {
  284. lock (_gate)
  285. {
  286. base._observer.OnNext(_list);
  287. base._observer.OnCompleted();
  288. base.Dispose();
  289. }
  290. }
  291. }
  292. class Impl : Sink<IList<TSource>>, IObserver<TSource>
  293. {
  294. private readonly Buffer<TSource> _parent;
  295. public Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  296. : base(observer, cancel)
  297. {
  298. _parent = parent;
  299. }
  300. private object _gate;
  301. private IList<TSource> _s;
  302. private int _n;
  303. private int _windowId;
  304. private SerialDisposable _timerD;
  305. public IDisposable Run()
  306. {
  307. _gate = new object();
  308. _s = default(IList<TSource>);
  309. _n = 0;
  310. _windowId = 0;
  311. _timerD = new SerialDisposable();
  312. _s = new List<TSource>();
  313. CreateTimer(0);
  314. var subscription = _parent._source.SubscribeSafe(this);
  315. return StableCompositeDisposable.Create(_timerD, subscription);
  316. }
  317. private void CreateTimer(int id)
  318. {
  319. var m = new SingleAssignmentDisposable();
  320. _timerD.Disposable = m;
  321. m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
  322. }
  323. private IDisposable Tick(IScheduler self, int id)
  324. {
  325. var d = Disposable.Empty;
  326. var newId = 0;
  327. lock (_gate)
  328. {
  329. if (id != _windowId)
  330. return d;
  331. _n = 0;
  332. newId = ++_windowId;
  333. var res = _s;
  334. _s = new List<TSource>();
  335. base._observer.OnNext(res);
  336. CreateTimer(newId);
  337. }
  338. return d;
  339. }
  340. public void OnNext(TSource value)
  341. {
  342. var newWindow = false;
  343. var newId = 0;
  344. lock (_gate)
  345. {
  346. _s.Add(value);
  347. _n++;
  348. if (_n == _parent._count)
  349. {
  350. newWindow = true;
  351. _n = 0;
  352. newId = ++_windowId;
  353. var res = _s;
  354. _s = new List<TSource>();
  355. base._observer.OnNext(res);
  356. }
  357. if (newWindow)
  358. CreateTimer(newId);
  359. }
  360. }
  361. public void OnError(Exception error)
  362. {
  363. lock (_gate)
  364. {
  365. _s.Clear();
  366. base._observer.OnError(error);
  367. base.Dispose();
  368. }
  369. }
  370. public void OnCompleted()
  371. {
  372. lock (_gate)
  373. {
  374. base._observer.OnNext(_s);
  375. base._observer.OnCompleted();
  376. base.Dispose();
  377. }
  378. }
  379. }
  380. }
  381. class Buffer<TSource, TBufferClosing> : Producer<IList<TSource>>
  382. {
  383. private readonly IObservable<TSource> _source;
  384. private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;
  385. private readonly IObservable<TBufferClosing> _bufferBoundaries;
  386. public Buffer(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)
  387. {
  388. _source = source;
  389. _bufferClosingSelector = bufferClosingSelector;
  390. }
  391. public Buffer(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries)
  392. {
  393. _source = source;
  394. _bufferBoundaries = bufferBoundaries;
  395. }
  396. protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  397. {
  398. if (_bufferClosingSelector != null)
  399. {
  400. var sink = new _(this, observer, cancel);
  401. setSink(sink);
  402. return sink.Run();
  403. }
  404. else
  405. {
  406. var sink = new Beta(this, observer, cancel);
  407. setSink(sink);
  408. return sink.Run();
  409. }
  410. }
  411. class _ : Sink<IList<TSource>>, IObserver<TSource>
  412. {
  413. private readonly Buffer<TSource, TBufferClosing> _parent;
  414. public _(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  415. : base(observer, cancel)
  416. {
  417. _parent = parent;
  418. }
  419. private IList<TSource> _buffer;
  420. private object _gate;
  421. private AsyncLock _bufferGate;
  422. private SerialDisposable _m;
  423. public IDisposable Run()
  424. {
  425. _buffer = new List<TSource>();
  426. _gate = new object();
  427. _bufferGate = new AsyncLock();
  428. _m = new SerialDisposable();
  429. var groupDisposable = new CompositeDisposable(2) { _m };
  430. groupDisposable.Add(_parent._source.SubscribeSafe(this));
  431. _bufferGate.Wait(CreateBufferClose);
  432. return groupDisposable;
  433. }
  434. private void CreateBufferClose()
  435. {
  436. var bufferClose = default(IObservable<TBufferClosing>);
  437. try
  438. {
  439. bufferClose = _parent._bufferClosingSelector();
  440. }
  441. catch (Exception exception)
  442. {
  443. lock (_gate)
  444. {
  445. base._observer.OnError(exception);
  446. base.Dispose();
  447. }
  448. return;
  449. }
  450. var closingSubscription = new SingleAssignmentDisposable();
  451. _m.Disposable = closingSubscription;
  452. closingSubscription.Disposable = bufferClose.SubscribeSafe(new Omega(this, closingSubscription));
  453. }
  454. private void CloseBuffer(IDisposable closingSubscription)
  455. {
  456. closingSubscription.Dispose();
  457. lock (_gate)
  458. {
  459. var res = _buffer;
  460. _buffer = new List<TSource>();
  461. base._observer.OnNext(res);
  462. }
  463. _bufferGate.Wait(CreateBufferClose);
  464. }
  465. class Omega : IObserver<TBufferClosing>
  466. {
  467. private readonly _ _parent;
  468. private readonly IDisposable _self;
  469. public Omega(_ parent, IDisposable self)
  470. {
  471. _parent = parent;
  472. _self = self;
  473. }
  474. public void OnNext(TBufferClosing value)
  475. {
  476. _parent.CloseBuffer(_self);
  477. }
  478. public void OnError(Exception error)
  479. {
  480. _parent.OnError(error);
  481. }
  482. public void OnCompleted()
  483. {
  484. _parent.CloseBuffer(_self);
  485. }
  486. }
  487. public void OnNext(TSource value)
  488. {
  489. lock (_gate)
  490. {
  491. _buffer.Add(value);
  492. }
  493. }
  494. public void OnError(Exception error)
  495. {
  496. lock (_gate)
  497. {
  498. _buffer.Clear();
  499. base._observer.OnError(error);
  500. base.Dispose();
  501. }
  502. }
  503. public void OnCompleted()
  504. {
  505. lock (_gate)
  506. {
  507. base._observer.OnNext(_buffer);
  508. base._observer.OnCompleted();
  509. base.Dispose();
  510. }
  511. }
  512. }
  513. class Beta : Sink<IList<TSource>>, IObserver<TSource>
  514. {
  515. private readonly Buffer<TSource, TBufferClosing> _parent;
  516. public Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  517. : base(observer, cancel)
  518. {
  519. _parent = parent;
  520. }
  521. private IList<TSource> _buffer;
  522. private object _gate;
  523. private RefCountDisposable _refCountDisposable;
  524. public IDisposable Run()
  525. {
  526. _buffer = new List<TSource>();
  527. _gate = new object();
  528. var d = new CompositeDisposable(2);
  529. _refCountDisposable = new RefCountDisposable(d);
  530. d.Add(_parent._source.SubscribeSafe(this));
  531. d.Add(_parent._bufferBoundaries.SubscribeSafe(new Omega(this)));
  532. return _refCountDisposable;
  533. }
  534. class Omega : IObserver<TBufferClosing>
  535. {
  536. private readonly Beta _parent;
  537. public Omega(Beta parent)
  538. {
  539. _parent = parent;
  540. }
  541. public void OnNext(TBufferClosing value)
  542. {
  543. lock (_parent._gate)
  544. {
  545. var res = _parent._buffer;
  546. _parent._buffer = new List<TSource>();
  547. _parent._observer.OnNext(res);
  548. }
  549. }
  550. public void OnError(Exception error)
  551. {
  552. _parent.OnError(error);
  553. }
  554. public void OnCompleted()
  555. {
  556. _parent.OnCompleted();
  557. }
  558. }
  559. public void OnNext(TSource value)
  560. {
  561. lock (_gate)
  562. {
  563. _buffer.Add(value);
  564. }
  565. }
  566. public void OnError(Exception error)
  567. {
  568. lock (_gate)
  569. {
  570. _buffer.Clear();
  571. base._observer.OnError(error);
  572. base.Dispose();
  573. }
  574. }
  575. public void OnCompleted()
  576. {
  577. lock (_gate)
  578. {
  579. base._observer.OnNext(_buffer);
  580. base._observer.OnCompleted();
  581. base.Dispose();
  582. }
  583. }
  584. }
  585. }
  586. }
  587. #endif