Buffer.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Threading;
  9. namespace System.Reactive.Linq.ObservableImpl
  10. {
  11. class Buffer<TSource> : Producer<IList<TSource>>
  12. {
  13. private readonly IObservable<TSource> _source;
  14. private readonly int _count;
  15. private readonly int _skip;
  16. private readonly TimeSpan _timeSpan;
  17. private readonly TimeSpan _timeShift;
  18. private readonly IScheduler _scheduler;
  19. public Buffer(IObservable<TSource> source, int count, int skip)
  20. {
  21. _source = source;
  22. _count = count;
  23. _skip = skip;
  24. }
  25. public Buffer(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  26. {
  27. _source = source;
  28. _timeSpan = timeSpan;
  29. _timeShift = timeShift;
  30. _scheduler = scheduler;
  31. }
  32. public Buffer(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  33. {
  34. _source = source;
  35. _timeSpan = timeSpan;
  36. _count = count;
  37. _scheduler = scheduler;
  38. }
  39. protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  40. {
  41. if (_scheduler == null)
  42. {
  43. var sink = new _(this, observer, cancel);
  44. setSink(sink);
  45. return sink.Run();
  46. }
  47. else if (_count > 0)
  48. {
  49. var sink = new Impl(this, observer, cancel);
  50. setSink(sink);
  51. return sink.Run();
  52. }
  53. else
  54. {
  55. if (_timeSpan == _timeShift)
  56. {
  57. var sink = new BufferTimeShift(this, observer, cancel);
  58. setSink(sink);
  59. return sink.Run();
  60. }
  61. else
  62. {
  63. var sink = new BufferImpl(this, observer, cancel);
  64. setSink(sink);
  65. return sink.Run();
  66. }
  67. }
  68. }
  69. class _ : Sink<IList<TSource>>, IObserver<TSource>
  70. {
  71. private readonly Buffer<TSource> _parent;
  72. public _(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  73. : base(observer, cancel)
  74. {
  75. _parent = parent;
  76. }
  77. private Queue<IList<TSource>> _queue;
  78. private int _n;
  79. public IDisposable Run()
  80. {
  81. _queue = new Queue<IList<TSource>>();
  82. _n = 0;
  83. CreateWindow();
  84. return _parent._source.SubscribeSafe(this);
  85. }
  86. private void CreateWindow()
  87. {
  88. var s = new List<TSource>();
  89. _queue.Enqueue(s);
  90. }
  91. public void OnNext(TSource value)
  92. {
  93. foreach (var s in _queue)
  94. s.Add(value);
  95. var c = _n - _parent._count + 1;
  96. if (c >= 0 && c % _parent._skip == 0)
  97. {
  98. var s = _queue.Dequeue();
  99. if (s.Count > 0)
  100. base._observer.OnNext(s);
  101. }
  102. _n++;
  103. if (_n % _parent._skip == 0)
  104. CreateWindow();
  105. }
  106. public void OnError(Exception error)
  107. {
  108. while (_queue.Count > 0)
  109. _queue.Dequeue().Clear();
  110. base._observer.OnError(error);
  111. base.Dispose();
  112. }
  113. public void OnCompleted()
  114. {
  115. while (_queue.Count > 0)
  116. {
  117. var s = _queue.Dequeue();
  118. if (s.Count > 0)
  119. base._observer.OnNext(s);
  120. }
  121. base._observer.OnCompleted();
  122. base.Dispose();
  123. }
  124. }
  125. class BufferImpl : Sink<IList<TSource>>, IObserver<TSource>
  126. {
  127. private readonly Buffer<TSource> _parent;
  128. public BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  129. : base(observer, cancel)
  130. {
  131. _parent = parent;
  132. }
  133. private TimeSpan _totalTime;
  134. private TimeSpan _nextShift;
  135. private TimeSpan _nextSpan;
  136. private object _gate;
  137. private Queue<List<TSource>> _q;
  138. private SerialDisposable _timerD;
  139. public IDisposable Run()
  140. {
  141. _totalTime = TimeSpan.Zero;
  142. _nextShift = _parent._timeShift;
  143. _nextSpan = _parent._timeSpan;
  144. _gate = new object();
  145. _q = new Queue<List<TSource>>();
  146. _timerD = new SerialDisposable();
  147. CreateWindow();
  148. CreateTimer();
  149. var subscription = _parent._source.SubscribeSafe(this);
  150. return StableCompositeDisposable.Create(_timerD, subscription);
  151. }
  152. private void CreateWindow()
  153. {
  154. var s = new List<TSource>();
  155. _q.Enqueue(s);
  156. }
  157. private void CreateTimer()
  158. {
  159. var m = new SingleAssignmentDisposable();
  160. _timerD.Disposable = m;
  161. var isSpan = false;
  162. var isShift = false;
  163. if (_nextSpan == _nextShift)
  164. {
  165. isSpan = true;
  166. isShift = true;
  167. }
  168. else if (_nextSpan < _nextShift)
  169. isSpan = true;
  170. else
  171. isShift = true;
  172. var newTotalTime = isSpan ? _nextSpan : _nextShift;
  173. var ts = newTotalTime - _totalTime;
  174. _totalTime = newTotalTime;
  175. if (isSpan)
  176. _nextSpan += _parent._timeShift;
  177. if (isShift)
  178. _nextShift += _parent._timeShift;
  179. m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
  180. }
  181. struct State
  182. {
  183. public bool isSpan;
  184. public bool isShift;
  185. }
  186. private IDisposable Tick(IScheduler self, State state)
  187. {
  188. lock (_gate)
  189. {
  190. //
  191. // Before v2, the two operations below were reversed. This doesn't have an observable
  192. // difference for Buffer, but is done to keep code consistent with Window, where we
  193. // took a breaking change in v2 to ensure consistency across overloads. For more info,
  194. // see the comment in Tick for Window.
  195. //
  196. if (state.isSpan)
  197. {
  198. var s = _q.Dequeue();
  199. base._observer.OnNext(s);
  200. }
  201. if (state.isShift)
  202. {
  203. CreateWindow();
  204. }
  205. }
  206. CreateTimer();
  207. return Disposable.Empty;
  208. }
  209. public void OnNext(TSource value)
  210. {
  211. lock (_gate)
  212. {
  213. foreach (var s in _q)
  214. s.Add(value);
  215. }
  216. }
  217. public void OnError(Exception error)
  218. {
  219. lock (_gate)
  220. {
  221. while (_q.Count > 0)
  222. _q.Dequeue().Clear();
  223. base._observer.OnError(error);
  224. base.Dispose();
  225. }
  226. }
  227. public void OnCompleted()
  228. {
  229. lock (_gate)
  230. {
  231. while (_q.Count > 0)
  232. base._observer.OnNext(_q.Dequeue());
  233. base._observer.OnCompleted();
  234. base.Dispose();
  235. }
  236. }
  237. }
  238. class BufferTimeShift : Sink<IList<TSource>>, IObserver<TSource>
  239. {
  240. private readonly Buffer<TSource> _parent;
  241. public BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  242. : base(observer, cancel)
  243. {
  244. _parent = parent;
  245. }
  246. private object _gate;
  247. private List<TSource> _list;
  248. public IDisposable Run()
  249. {
  250. _gate = new object();
  251. _list = new List<TSource>();
  252. var d = _parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick);
  253. var s = _parent._source.SubscribeSafe(this);
  254. return StableCompositeDisposable.Create(d, s);
  255. }
  256. private void Tick()
  257. {
  258. lock (_gate)
  259. {
  260. base._observer.OnNext(_list);
  261. _list = new List<TSource>();
  262. }
  263. }
  264. public void OnNext(TSource value)
  265. {
  266. lock (_gate)
  267. {
  268. _list.Add(value);
  269. }
  270. }
  271. public void OnError(Exception error)
  272. {
  273. lock (_gate)
  274. {
  275. _list.Clear();
  276. base._observer.OnError(error);
  277. base.Dispose();
  278. }
  279. }
  280. public void OnCompleted()
  281. {
  282. lock (_gate)
  283. {
  284. base._observer.OnNext(_list);
  285. base._observer.OnCompleted();
  286. base.Dispose();
  287. }
  288. }
  289. }
  290. class Impl : Sink<IList<TSource>>, IObserver<TSource>
  291. {
  292. private readonly Buffer<TSource> _parent;
  293. public Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  294. : base(observer, cancel)
  295. {
  296. _parent = parent;
  297. }
  298. private object _gate;
  299. private IList<TSource> _s;
  300. private int _n;
  301. private int _windowId;
  302. private SerialDisposable _timerD;
  303. public IDisposable Run()
  304. {
  305. _gate = new object();
  306. _s = default(IList<TSource>);
  307. _n = 0;
  308. _windowId = 0;
  309. _timerD = new SerialDisposable();
  310. _s = new List<TSource>();
  311. CreateTimer(0);
  312. var subscription = _parent._source.SubscribeSafe(this);
  313. return StableCompositeDisposable.Create(_timerD, subscription);
  314. }
  315. private void CreateTimer(int id)
  316. {
  317. var m = new SingleAssignmentDisposable();
  318. _timerD.Disposable = m;
  319. m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
  320. }
  321. private IDisposable Tick(IScheduler self, int id)
  322. {
  323. var d = Disposable.Empty;
  324. var newId = 0;
  325. lock (_gate)
  326. {
  327. if (id != _windowId)
  328. return d;
  329. _n = 0;
  330. newId = ++_windowId;
  331. var res = _s;
  332. _s = new List<TSource>();
  333. base._observer.OnNext(res);
  334. CreateTimer(newId);
  335. }
  336. return d;
  337. }
  338. public void OnNext(TSource value)
  339. {
  340. var newWindow = false;
  341. var newId = 0;
  342. lock (_gate)
  343. {
  344. _s.Add(value);
  345. _n++;
  346. if (_n == _parent._count)
  347. {
  348. newWindow = true;
  349. _n = 0;
  350. newId = ++_windowId;
  351. var res = _s;
  352. _s = new List<TSource>();
  353. base._observer.OnNext(res);
  354. }
  355. if (newWindow)
  356. CreateTimer(newId);
  357. }
  358. }
  359. public void OnError(Exception error)
  360. {
  361. lock (_gate)
  362. {
  363. _s.Clear();
  364. base._observer.OnError(error);
  365. base.Dispose();
  366. }
  367. }
  368. public void OnCompleted()
  369. {
  370. lock (_gate)
  371. {
  372. base._observer.OnNext(_s);
  373. base._observer.OnCompleted();
  374. base.Dispose();
  375. }
  376. }
  377. }
  378. }
  379. class Buffer<TSource, TBufferClosing> : Producer<IList<TSource>>
  380. {
  381. private readonly IObservable<TSource> _source;
  382. private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;
  383. private readonly IObservable<TBufferClosing> _bufferBoundaries;
  384. public Buffer(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)
  385. {
  386. _source = source;
  387. _bufferClosingSelector = bufferClosingSelector;
  388. }
  389. public Buffer(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries)
  390. {
  391. _source = source;
  392. _bufferBoundaries = bufferBoundaries;
  393. }
  394. protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  395. {
  396. if (_bufferClosingSelector != null)
  397. {
  398. var sink = new _(this, observer, cancel);
  399. setSink(sink);
  400. return sink.Run();
  401. }
  402. else
  403. {
  404. var sink = new Beta(this, observer, cancel);
  405. setSink(sink);
  406. return sink.Run();
  407. }
  408. }
  409. class _ : Sink<IList<TSource>>, IObserver<TSource>
  410. {
  411. private readonly Buffer<TSource, TBufferClosing> _parent;
  412. public _(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  413. : base(observer, cancel)
  414. {
  415. _parent = parent;
  416. }
  417. private IList<TSource> _buffer;
  418. private object _gate;
  419. private AsyncLock _bufferGate;
  420. private SerialDisposable _m;
  421. public IDisposable Run()
  422. {
  423. _buffer = new List<TSource>();
  424. _gate = new object();
  425. _bufferGate = new AsyncLock();
  426. _m = new SerialDisposable();
  427. var groupDisposable = new CompositeDisposable(2) { _m };
  428. groupDisposable.Add(_parent._source.SubscribeSafe(this));
  429. _bufferGate.Wait(CreateBufferClose);
  430. return groupDisposable;
  431. }
  432. private void CreateBufferClose()
  433. {
  434. var bufferClose = default(IObservable<TBufferClosing>);
  435. try
  436. {
  437. bufferClose = _parent._bufferClosingSelector();
  438. }
  439. catch (Exception exception)
  440. {
  441. lock (_gate)
  442. {
  443. base._observer.OnError(exception);
  444. base.Dispose();
  445. }
  446. return;
  447. }
  448. var closingSubscription = new SingleAssignmentDisposable();
  449. _m.Disposable = closingSubscription;
  450. closingSubscription.Disposable = bufferClose.SubscribeSafe(new Omega(this, closingSubscription));
  451. }
  452. private void CloseBuffer(IDisposable closingSubscription)
  453. {
  454. closingSubscription.Dispose();
  455. lock (_gate)
  456. {
  457. var res = _buffer;
  458. _buffer = new List<TSource>();
  459. base._observer.OnNext(res);
  460. }
  461. _bufferGate.Wait(CreateBufferClose);
  462. }
  463. class Omega : IObserver<TBufferClosing>
  464. {
  465. private readonly _ _parent;
  466. private readonly IDisposable _self;
  467. public Omega(_ parent, IDisposable self)
  468. {
  469. _parent = parent;
  470. _self = self;
  471. }
  472. public void OnNext(TBufferClosing value)
  473. {
  474. _parent.CloseBuffer(_self);
  475. }
  476. public void OnError(Exception error)
  477. {
  478. _parent.OnError(error);
  479. }
  480. public void OnCompleted()
  481. {
  482. _parent.CloseBuffer(_self);
  483. }
  484. }
  485. public void OnNext(TSource value)
  486. {
  487. lock (_gate)
  488. {
  489. _buffer.Add(value);
  490. }
  491. }
  492. public void OnError(Exception error)
  493. {
  494. lock (_gate)
  495. {
  496. _buffer.Clear();
  497. base._observer.OnError(error);
  498. base.Dispose();
  499. }
  500. }
  501. public void OnCompleted()
  502. {
  503. lock (_gate)
  504. {
  505. base._observer.OnNext(_buffer);
  506. base._observer.OnCompleted();
  507. base.Dispose();
  508. }
  509. }
  510. }
  511. class Beta : Sink<IList<TSource>>, IObserver<TSource>
  512. {
  513. private readonly Buffer<TSource, TBufferClosing> _parent;
  514. public Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
  515. : base(observer, cancel)
  516. {
  517. _parent = parent;
  518. }
  519. private IList<TSource> _buffer;
  520. private object _gate;
  521. private RefCountDisposable _refCountDisposable;
  522. public IDisposable Run()
  523. {
  524. _buffer = new List<TSource>();
  525. _gate = new object();
  526. var d = new CompositeDisposable(2);
  527. _refCountDisposable = new RefCountDisposable(d);
  528. d.Add(_parent._source.SubscribeSafe(this));
  529. d.Add(_parent._bufferBoundaries.SubscribeSafe(new Omega(this)));
  530. return _refCountDisposable;
  531. }
  532. class Omega : IObserver<TBufferClosing>
  533. {
  534. private readonly Beta _parent;
  535. public Omega(Beta parent)
  536. {
  537. _parent = parent;
  538. }
  539. public void OnNext(TBufferClosing value)
  540. {
  541. lock (_parent._gate)
  542. {
  543. var res = _parent._buffer;
  544. _parent._buffer = new List<TSource>();
  545. _parent._observer.OnNext(res);
  546. }
  547. }
  548. public void OnError(Exception error)
  549. {
  550. _parent.OnError(error);
  551. }
  552. public void OnCompleted()
  553. {
  554. _parent.OnCompleted();
  555. }
  556. }
  557. public void OnNext(TSource value)
  558. {
  559. lock (_gate)
  560. {
  561. _buffer.Add(value);
  562. }
  563. }
  564. public void OnError(Exception error)
  565. {
  566. lock (_gate)
  567. {
  568. _buffer.Clear();
  569. base._observer.OnError(error);
  570. base.Dispose();
  571. }
  572. }
  573. public void OnCompleted()
  574. {
  575. lock (_gate)
  576. {
  577. base._observer.OnNext(_buffer);
  578. base._observer.OnCompleted();
  579. base.Dispose();
  580. }
  581. }
  582. }
  583. }
  584. }
  585. #endif