Window.cs 23 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Subjects;
  9. using System.Threading;
  10. namespace System.Reactive.Linq.ObservableImpl
  11. {
  12. class Window<TSource> : Producer<IObservable<TSource>>
  13. {
  14. private readonly IObservable<TSource> _source;
  15. private readonly int _count;
  16. private readonly int _skip;
  17. private readonly TimeSpan _timeSpan;
  18. private readonly TimeSpan _timeShift;
  19. private readonly IScheduler _scheduler;
  20. public Window(IObservable<TSource> source, int count, int skip)
  21. {
  22. _source = source;
  23. _count = count;
  24. _skip = skip;
  25. }
  26. public Window(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  27. {
  28. _source = source;
  29. _timeSpan = timeSpan;
  30. _timeShift = timeShift;
  31. _scheduler = scheduler;
  32. }
  33. public Window(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  34. {
  35. _source = source;
  36. _timeSpan = timeSpan;
  37. _count = count;
  38. _scheduler = scheduler;
  39. }
  40. protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  41. {
  42. if (_scheduler == null)
  43. {
  44. var sink = new _(this, observer, cancel);
  45. setSink(sink);
  46. return sink.Run();
  47. }
  48. else if (_count > 0)
  49. {
  50. var sink = new BoundedWindowImpl(this, observer, cancel);
  51. setSink(sink);
  52. return sink.Run();
  53. }
  54. else
  55. {
  56. if (_timeSpan == _timeShift)
  57. {
  58. var sink = new TimeShiftImpl(this, observer, cancel);
  59. setSink(sink);
  60. return sink.Run();
  61. }
  62. else
  63. {
  64. var sink = new WindowImpl(this, observer, cancel);
  65. setSink(sink);
  66. return sink.Run();
  67. }
  68. }
  69. }
  70. class _ : Sink<IObservable<TSource>>, IObserver<TSource>
  71. {
  72. private readonly Window<TSource> _parent;
  73. public _(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
  74. : base(observer, cancel)
  75. {
  76. _parent = parent;
  77. }
  78. private Queue<ISubject<TSource>> _queue;
  79. private int _n;
  80. private SingleAssignmentDisposable _m;
  81. private RefCountDisposable _refCountDisposable;
  82. public IDisposable Run()
  83. {
  84. _queue = new Queue<ISubject<TSource>>();
  85. _n = 0;
  86. _m = new SingleAssignmentDisposable();
  87. _refCountDisposable = new RefCountDisposable(_m);
  88. var firstWindow = CreateWindow();
  89. base._observer.OnNext(firstWindow);
  90. _m.Disposable = _parent._source.SubscribeSafe(this);
  91. return _refCountDisposable;
  92. }
  93. private IObservable<TSource> CreateWindow()
  94. {
  95. var s = new Subject<TSource>();
  96. _queue.Enqueue(s);
  97. return new WindowObservable<TSource>(s, _refCountDisposable);
  98. }
  99. public void OnNext(TSource value)
  100. {
  101. foreach (var s in _queue)
  102. s.OnNext(value);
  103. var c = _n - _parent._count + 1;
  104. if (c >= 0 && c % _parent._skip == 0)
  105. {
  106. var s = _queue.Dequeue();
  107. s.OnCompleted();
  108. }
  109. _n++;
  110. if (_n % _parent._skip == 0)
  111. {
  112. var newWindow = CreateWindow();
  113. base._observer.OnNext(newWindow);
  114. }
  115. }
  116. public void OnError(Exception error)
  117. {
  118. while (_queue.Count > 0)
  119. _queue.Dequeue().OnError(error);
  120. base._observer.OnError(error);
  121. base.Dispose();
  122. }
  123. public void OnCompleted()
  124. {
  125. while (_queue.Count > 0)
  126. _queue.Dequeue().OnCompleted();
  127. base._observer.OnCompleted();
  128. base.Dispose();
  129. }
  130. }
  131. class WindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>
  132. {
  133. private readonly Window<TSource> _parent;
  134. public WindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
  135. : base(observer, cancel)
  136. {
  137. _parent = parent;
  138. }
  139. private TimeSpan _totalTime;
  140. private TimeSpan _nextShift;
  141. private TimeSpan _nextSpan;
  142. private object _gate;
  143. private Queue<ISubject<TSource>> _q;
  144. private SerialDisposable _timerD;
  145. private RefCountDisposable _refCountDisposable;
  146. public IDisposable Run()
  147. {
  148. _totalTime = TimeSpan.Zero;
  149. _nextShift = _parent._timeShift;
  150. _nextSpan = _parent._timeSpan;
  151. _gate = new object();
  152. _q = new Queue<ISubject<TSource>>();
  153. _timerD = new SerialDisposable();
  154. var groupDisposable = new CompositeDisposable(2) { _timerD };
  155. _refCountDisposable = new RefCountDisposable(groupDisposable);
  156. CreateWindow();
  157. CreateTimer();
  158. groupDisposable.Add(_parent._source.SubscribeSafe(this));
  159. return _refCountDisposable;
  160. }
  161. private void CreateWindow()
  162. {
  163. var s = new Subject<TSource>();
  164. _q.Enqueue(s);
  165. base._observer.OnNext(new WindowObservable<TSource>(s, _refCountDisposable));
  166. }
  167. private void CreateTimer()
  168. {
  169. var m = new SingleAssignmentDisposable();
  170. _timerD.Disposable = m;
  171. var isSpan = false;
  172. var isShift = false;
  173. if (_nextSpan == _nextShift)
  174. {
  175. isSpan = true;
  176. isShift = true;
  177. }
  178. else if (_nextSpan < _nextShift)
  179. isSpan = true;
  180. else
  181. isShift = true;
  182. var newTotalTime = isSpan ? _nextSpan : _nextShift;
  183. var ts = newTotalTime - _totalTime;
  184. _totalTime = newTotalTime;
  185. if (isSpan)
  186. _nextSpan += _parent._timeShift;
  187. if (isShift)
  188. _nextShift += _parent._timeShift;
  189. m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
  190. }
  191. struct State
  192. {
  193. public bool isSpan;
  194. public bool isShift;
  195. }
  196. private IDisposable Tick(IScheduler self, State state)
  197. {
  198. lock (_gate)
  199. {
  200. //
  201. // BREAKING CHANGE v2 > v1.x - Making behavior of sending OnCompleted to the window
  202. // before sending out a new window consistent across all
  203. // overloads of Window and Buffer. Before v2, the two
  204. // operations below were reversed.
  205. //
  206. if (state.isSpan)
  207. {
  208. var s = _q.Dequeue();
  209. s.OnCompleted();
  210. }
  211. if (state.isShift)
  212. {
  213. CreateWindow();
  214. }
  215. }
  216. CreateTimer();
  217. return Disposable.Empty;
  218. }
  219. public void OnNext(TSource value)
  220. {
  221. lock (_gate)
  222. {
  223. foreach (var s in _q)
  224. s.OnNext(value);
  225. }
  226. }
  227. public void OnError(Exception error)
  228. {
  229. lock (_gate)
  230. {
  231. foreach (var s in _q)
  232. s.OnError(error);
  233. base._observer.OnError(error);
  234. base.Dispose();
  235. }
  236. }
  237. public void OnCompleted()
  238. {
  239. lock (_gate)
  240. {
  241. foreach (var s in _q)
  242. s.OnCompleted();
  243. base._observer.OnCompleted();
  244. base.Dispose();
  245. }
  246. }
  247. }
  248. class TimeShiftImpl : Sink<IObservable<TSource>>, IObserver<TSource>
  249. {
  250. private readonly Window<TSource> _parent;
  251. public TimeShiftImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
  252. : base(observer, cancel)
  253. {
  254. _parent = parent;
  255. }
  256. private object _gate;
  257. private Subject<TSource> _subject;
  258. private RefCountDisposable _refCountDisposable;
  259. public IDisposable Run()
  260. {
  261. _gate = new object();
  262. var groupDisposable = new CompositeDisposable(2);
  263. _refCountDisposable = new RefCountDisposable(groupDisposable);
  264. CreateWindow();
  265. groupDisposable.Add(_parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick));
  266. groupDisposable.Add(_parent._source.SubscribeSafe(this));
  267. return _refCountDisposable;
  268. }
  269. private void Tick()
  270. {
  271. lock (_gate)
  272. {
  273. _subject.OnCompleted();
  274. CreateWindow();
  275. }
  276. }
  277. private void CreateWindow()
  278. {
  279. _subject = new Subject<TSource>();
  280. base._observer.OnNext(new WindowObservable<TSource>(_subject, _refCountDisposable));
  281. }
  282. public void OnNext(TSource value)
  283. {
  284. lock (_gate)
  285. {
  286. _subject.OnNext(value);
  287. }
  288. }
  289. public void OnError(Exception error)
  290. {
  291. lock (_gate)
  292. {
  293. _subject.OnError(error);
  294. base._observer.OnError(error);
  295. base.Dispose();
  296. }
  297. }
  298. public void OnCompleted()
  299. {
  300. lock (_gate)
  301. {
  302. _subject.OnCompleted();
  303. base._observer.OnCompleted();
  304. base.Dispose();
  305. }
  306. }
  307. }
  308. class BoundedWindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>
  309. {
  310. private readonly Window<TSource> _parent;
  311. public BoundedWindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
  312. : base(observer, cancel)
  313. {
  314. _parent = parent;
  315. }
  316. private object _gate;
  317. private ISubject<TSource> _s;
  318. private int _n;
  319. private int _windowId;
  320. private SerialDisposable _timerD;
  321. private RefCountDisposable _refCountDisposable;
  322. public IDisposable Run()
  323. {
  324. _gate = new object();
  325. _s = default(ISubject<TSource>);
  326. _n = 0;
  327. _windowId = 0;
  328. _timerD = new SerialDisposable();
  329. var groupDisposable = new CompositeDisposable(2) { _timerD };
  330. _refCountDisposable = new RefCountDisposable(groupDisposable);
  331. _s = new Subject<TSource>();
  332. base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
  333. CreateTimer(0);
  334. groupDisposable.Add(_parent._source.SubscribeSafe(this));
  335. return _refCountDisposable;
  336. }
  337. private void CreateTimer(int id)
  338. {
  339. var m = new SingleAssignmentDisposable();
  340. _timerD.Disposable = m;
  341. m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
  342. }
  343. private IDisposable Tick(IScheduler self, int id)
  344. {
  345. var d = Disposable.Empty;
  346. var newId = 0;
  347. lock (_gate)
  348. {
  349. if (id != _windowId)
  350. return d;
  351. _n = 0;
  352. newId = ++_windowId;
  353. _s.OnCompleted();
  354. _s = new Subject<TSource>();
  355. base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
  356. }
  357. CreateTimer(newId);
  358. return d;
  359. }
  360. public void OnNext(TSource value)
  361. {
  362. var newWindow = false;
  363. var newId = 0;
  364. lock (_gate)
  365. {
  366. _s.OnNext(value);
  367. _n++;
  368. if (_n == _parent._count)
  369. {
  370. newWindow = true;
  371. _n = 0;
  372. newId = ++_windowId;
  373. _s.OnCompleted();
  374. _s = new Subject<TSource>();
  375. base._observer.OnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
  376. }
  377. }
  378. if (newWindow)
  379. CreateTimer(newId);
  380. }
  381. public void OnError(Exception error)
  382. {
  383. lock (_gate)
  384. {
  385. _s.OnError(error);
  386. base._observer.OnError(error);
  387. base.Dispose();
  388. }
  389. }
  390. public void OnCompleted()
  391. {
  392. lock (_gate)
  393. {
  394. _s.OnCompleted();
  395. base._observer.OnCompleted();
  396. base.Dispose();
  397. }
  398. }
  399. }
  400. }
  401. class Window<TSource, TWindowClosing> : Producer<IObservable<TSource>>
  402. {
  403. private readonly IObservable<TSource> _source;
  404. private readonly Func<IObservable<TWindowClosing>> _windowClosingSelector;
  405. private readonly IObservable<TWindowClosing> _windowBoundaries;
  406. public Window(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector)
  407. {
  408. _source = source;
  409. _windowClosingSelector = windowClosingSelector;
  410. }
  411. public Window(IObservable<TSource> source, IObservable<TWindowClosing> windowBoundaries)
  412. {
  413. _source = source;
  414. _windowBoundaries = windowBoundaries;
  415. }
  416. protected override IDisposable Run(IObserver<IObservable<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  417. {
  418. if (_windowClosingSelector != null)
  419. {
  420. var sink = new _(this, observer, cancel);
  421. setSink(sink);
  422. return sink.Run();
  423. }
  424. else
  425. {
  426. var sink = new Beta(this, observer, cancel);
  427. setSink(sink);
  428. return sink.Run();
  429. }
  430. }
  431. class _ : Sink<IObservable<TSource>>, IObserver<TSource>
  432. {
  433. private readonly Window<TSource, TWindowClosing> _parent;
  434. public _(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
  435. : base(observer, cancel)
  436. {
  437. _parent = parent;
  438. }
  439. private ISubject<TSource> _window;
  440. private object _gate;
  441. private AsyncLock _windowGate;
  442. private SerialDisposable _m;
  443. private RefCountDisposable _refCountDisposable;
  444. public IDisposable Run()
  445. {
  446. _window = new Subject<TSource>();
  447. _gate = new object();
  448. _windowGate = new AsyncLock();
  449. _m = new SerialDisposable();
  450. var groupDisposable = new CompositeDisposable(2) { _m };
  451. _refCountDisposable = new RefCountDisposable(groupDisposable);
  452. var window = new WindowObservable<TSource>(_window, _refCountDisposable);
  453. base._observer.OnNext(window);
  454. groupDisposable.Add(_parent._source.SubscribeSafe(this));
  455. _windowGate.Wait(CreateWindowClose);
  456. return _refCountDisposable;
  457. }
  458. private void CreateWindowClose()
  459. {
  460. var windowClose = default(IObservable<TWindowClosing>);
  461. try
  462. {
  463. windowClose = _parent._windowClosingSelector();
  464. }
  465. catch (Exception exception)
  466. {
  467. lock (_gate)
  468. {
  469. base._observer.OnError(exception);
  470. base.Dispose();
  471. }
  472. return;
  473. }
  474. var closingSubscription = new SingleAssignmentDisposable();
  475. _m.Disposable = closingSubscription;
  476. closingSubscription.Disposable = windowClose.SubscribeSafe(new Omega(this, closingSubscription));
  477. }
  478. private void CloseWindow(IDisposable closingSubscription)
  479. {
  480. closingSubscription.Dispose();
  481. lock (_gate)
  482. {
  483. _window.OnCompleted();
  484. _window = new Subject<TSource>();
  485. var window = new WindowObservable<TSource>(_window, _refCountDisposable);
  486. base._observer.OnNext(window);
  487. }
  488. _windowGate.Wait(CreateWindowClose);
  489. }
  490. class Omega : IObserver<TWindowClosing>
  491. {
  492. private readonly _ _parent;
  493. private readonly IDisposable _self;
  494. public Omega(_ parent, IDisposable self)
  495. {
  496. _parent = parent;
  497. _self = self;
  498. }
  499. public void OnNext(TWindowClosing value)
  500. {
  501. _parent.CloseWindow(_self);
  502. }
  503. public void OnError(Exception error)
  504. {
  505. _parent.OnError(error);
  506. }
  507. public void OnCompleted()
  508. {
  509. _parent.CloseWindow(_self);
  510. }
  511. }
  512. public void OnNext(TSource value)
  513. {
  514. lock (_gate)
  515. {
  516. _window.OnNext(value);
  517. }
  518. }
  519. public void OnError(Exception error)
  520. {
  521. lock (_gate)
  522. {
  523. _window.OnError(error);
  524. base._observer.OnError(error);
  525. base.Dispose();
  526. }
  527. }
  528. public void OnCompleted()
  529. {
  530. lock (_gate)
  531. {
  532. _window.OnCompleted();
  533. base._observer.OnCompleted();
  534. base.Dispose();
  535. }
  536. }
  537. }
  538. class Beta : Sink<IObservable<TSource>>, IObserver<TSource>
  539. {
  540. private readonly Window<TSource, TWindowClosing> _parent;
  541. public Beta(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
  542. : base(observer, cancel)
  543. {
  544. _parent = parent;
  545. }
  546. private ISubject<TSource> _window;
  547. private object _gate;
  548. private RefCountDisposable _refCountDisposable;
  549. public IDisposable Run()
  550. {
  551. _window = new Subject<TSource>();
  552. _gate = new object();
  553. var d = new CompositeDisposable(2);
  554. _refCountDisposable = new RefCountDisposable(d);
  555. var window = new WindowObservable<TSource>(_window, _refCountDisposable);
  556. base._observer.OnNext(window);
  557. d.Add(_parent._source.SubscribeSafe(this));
  558. d.Add(_parent._windowBoundaries.SubscribeSafe(new Omega(this)));
  559. return _refCountDisposable;
  560. }
  561. class Omega : IObserver<TWindowClosing>
  562. {
  563. private readonly Beta _parent;
  564. public Omega(Beta parent)
  565. {
  566. _parent = parent;
  567. }
  568. public void OnNext(TWindowClosing value)
  569. {
  570. lock (_parent._gate)
  571. {
  572. _parent._window.OnCompleted();
  573. _parent._window = new Subject<TSource>();
  574. var window = new WindowObservable<TSource>(_parent._window, _parent._refCountDisposable);
  575. _parent._observer.OnNext(window);
  576. }
  577. }
  578. public void OnError(Exception error)
  579. {
  580. _parent.OnError(error);
  581. }
  582. public void OnCompleted()
  583. {
  584. _parent.OnCompleted();
  585. }
  586. }
  587. public void OnNext(TSource value)
  588. {
  589. lock (_gate)
  590. {
  591. _window.OnNext(value);
  592. }
  593. }
  594. public void OnError(Exception error)
  595. {
  596. lock (_gate)
  597. {
  598. _window.OnError(error);
  599. base._observer.OnError(error);
  600. base.Dispose();
  601. }
  602. }
  603. public void OnCompleted()
  604. {
  605. lock (_gate)
  606. {
  607. _window.OnCompleted();
  608. base._observer.OnCompleted();
  609. base.Dispose();
  610. }
  611. }
  612. }
  613. }
  614. class WindowObservable<TSource> : AddRef<TSource>
  615. {
  616. public WindowObservable(IObservable<TSource> source, RefCountDisposable refCount)
  617. : base(source, refCount)
  618. {
  619. }
  620. }
  621. }
  622. #endif