1
0

Delay.cs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System.Collections.Generic;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. #if NO_SEMAPHORE
  8. using System.Reactive.Threading;
  9. #endif
  10. namespace System.Reactive.Linq.Observαble
  11. {
  12. class Delay<TSource> : Producer<TSource>
  13. {
  14. private readonly IObservable<TSource> _source;
  15. private readonly TimeSpan? _dueTimeR;
  16. private readonly DateTimeOffset? _dueTimeA;
  17. private readonly IScheduler _scheduler;
  18. public Delay(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  19. {
  20. _source = source;
  21. _dueTimeR = dueTime;
  22. _scheduler = scheduler;
  23. }
  24. public Delay(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  25. {
  26. _source = source;
  27. _dueTimeA = dueTime;
  28. _scheduler = scheduler;
  29. }
  30. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  31. {
  32. if (_scheduler.AsLongRunning() != null)
  33. {
  34. var sink = new λ(this, observer, cancel);
  35. setSink(sink);
  36. return sink.Run();
  37. }
  38. else
  39. {
  40. var sink = new _(this, observer, cancel);
  41. setSink(sink);
  42. return sink.Run();
  43. }
  44. }
  45. class _ : Sink<TSource>, IObserver<TSource>
  46. {
  47. private readonly Delay<TSource> _parent;
  48. public _(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  49. : base(observer, cancel)
  50. {
  51. _parent = parent;
  52. }
  53. private IScheduler _scheduler;
  54. private IDisposable _sourceSubscription;
  55. private SerialDisposable _cancelable;
  56. private TimeSpan _delay;
  57. private IStopwatch _watch;
  58. private object _gate;
  59. private bool _ready;
  60. private bool _active;
  61. private bool _running;
  62. private Queue<System.Reactive.TimeInterval<TSource>> _queue;
  63. private bool _hasCompleted;
  64. private TimeSpan _completeAt;
  65. private bool _hasFailed;
  66. private Exception _exception;
  67. public IDisposable Run()
  68. {
  69. _scheduler = _parent._scheduler;
  70. _cancelable = new SerialDisposable();
  71. _gate = new object();
  72. _active = false;
  73. _running = false;
  74. _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
  75. _hasCompleted = false;
  76. _completeAt = default(TimeSpan);
  77. _hasFailed = false;
  78. _exception = default(Exception);
  79. _watch = _scheduler.StartStopwatch();
  80. if (_parent._dueTimeA.HasValue)
  81. {
  82. _ready = false;
  83. var dueTimeA = _parent._dueTimeA.Value;
  84. _cancelable.Disposable = _scheduler.Schedule(dueTimeA, Start);
  85. }
  86. else
  87. {
  88. _ready = true;
  89. var dueTimeR = _parent._dueTimeR.Value;
  90. _delay = Scheduler.Normalize(dueTimeR);
  91. }
  92. var sourceSubscription = new SingleAssignmentDisposable();
  93. _sourceSubscription = sourceSubscription;
  94. sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  95. return new CompositeDisposable(_sourceSubscription, _cancelable);
  96. }
  97. private void Start()
  98. {
  99. var next = default(TimeSpan);
  100. var shouldRun = false;
  101. lock (_gate)
  102. {
  103. _delay = _watch.Elapsed;
  104. var oldQueue = _queue;
  105. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  106. if (oldQueue.Count > 0)
  107. {
  108. next = oldQueue.Peek().Interval;
  109. while (oldQueue.Count > 0)
  110. {
  111. var item = oldQueue.Dequeue();
  112. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  113. }
  114. shouldRun = true;
  115. _active = true;
  116. }
  117. _ready = true;
  118. }
  119. if (shouldRun)
  120. {
  121. _cancelable.Disposable = _scheduler.Schedule(next, DrainQueue);
  122. }
  123. }
  124. public void OnNext(TSource value)
  125. {
  126. var next = _watch.Elapsed.Add(_delay);
  127. var shouldRun = false;
  128. lock (_gate)
  129. {
  130. _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
  131. shouldRun = _ready && !_active;
  132. _active = true;
  133. }
  134. if (shouldRun)
  135. {
  136. _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
  137. }
  138. }
  139. public void OnError(Exception error)
  140. {
  141. _sourceSubscription.Dispose();
  142. var shouldRun = false;
  143. lock (_gate)
  144. {
  145. _queue.Clear();
  146. _exception = error;
  147. _hasFailed = true;
  148. shouldRun = !_running;
  149. }
  150. if (shouldRun)
  151. {
  152. base._observer.OnError(error);
  153. base.Dispose();
  154. }
  155. }
  156. public void OnCompleted()
  157. {
  158. _sourceSubscription.Dispose();
  159. var next = _watch.Elapsed.Add(_delay);
  160. var shouldRun = false;
  161. lock (_gate)
  162. {
  163. _completeAt = next;
  164. _hasCompleted = true;
  165. shouldRun = _ready && !_active;
  166. _active = true;
  167. }
  168. if (shouldRun)
  169. {
  170. _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
  171. }
  172. }
  173. private void DrainQueue(Action<TimeSpan> recurse)
  174. {
  175. lock (_gate)
  176. {
  177. if (_hasFailed)
  178. return;
  179. _running = true;
  180. }
  181. //
  182. // The shouldYield flag was added to address TFS 487881: "Delay can be unfair". In the old
  183. // implementation, the loop below kept running while there was work for immediate dispatch,
  184. // potentially causing a long running work item on the target scheduler. With the addition
  185. // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
  186. // interface and perform different processing (see λ). To reduce the code churn in the old
  187. // loop code here, we set the shouldYield flag to true after the first dispatch iteration,
  188. // in order to break from the loop and enter the recursive scheduling path.
  189. //
  190. var shouldYield = false;
  191. while (true)
  192. {
  193. var hasFailed = false;
  194. var error = default(Exception);
  195. var hasValue = false;
  196. var value = default(TSource);
  197. var hasCompleted = false;
  198. var shouldRecurse = false;
  199. var recurseDueTime = default(TimeSpan);
  200. lock (_gate)
  201. {
  202. if (_hasFailed)
  203. {
  204. error = _exception;
  205. hasFailed = true;
  206. _running = false;
  207. }
  208. else
  209. {
  210. var now = _watch.Elapsed;
  211. if (_queue.Count > 0)
  212. {
  213. var nextDue = _queue.Peek().Interval;
  214. if (nextDue.CompareTo(now) <= 0 && !shouldYield)
  215. {
  216. value = _queue.Dequeue().Value;
  217. hasValue = true;
  218. }
  219. else
  220. {
  221. shouldRecurse = true;
  222. recurseDueTime = Scheduler.Normalize(nextDue.Subtract(now));
  223. _running = false;
  224. }
  225. }
  226. else if (_hasCompleted)
  227. {
  228. if (_completeAt.CompareTo(now) <= 0 && !shouldYield)
  229. {
  230. hasCompleted = true;
  231. }
  232. else
  233. {
  234. shouldRecurse = true;
  235. recurseDueTime = Scheduler.Normalize(_completeAt.Subtract(now));
  236. _running = false;
  237. }
  238. }
  239. else
  240. {
  241. _running = false;
  242. _active = false;
  243. }
  244. }
  245. } /* lock (_gate) */
  246. if (hasValue)
  247. {
  248. base._observer.OnNext(value);
  249. shouldYield = true;
  250. }
  251. else
  252. {
  253. if (hasCompleted)
  254. {
  255. base._observer.OnCompleted();
  256. base.Dispose();
  257. }
  258. else if (hasFailed)
  259. {
  260. base._observer.OnError(error);
  261. base.Dispose();
  262. }
  263. else if (shouldRecurse)
  264. {
  265. recurse(recurseDueTime);
  266. }
  267. return;
  268. }
  269. } /* while (true) */
  270. }
  271. }
  272. class λ : Sink<TSource>, IObserver<TSource>
  273. {
  274. private readonly Delay<TSource> _parent;
  275. public λ(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  276. : base(observer, cancel)
  277. {
  278. _parent = parent;
  279. }
  280. private IDisposable _sourceSubscription;
  281. private SerialDisposable _cancelable;
  282. private TimeSpan _delay;
  283. private IStopwatch _watch;
  284. private object _gate;
  285. #if !NO_CDS
  286. private SemaphoreSlim _evt;
  287. private CancellationTokenSource _stop;
  288. #else
  289. private Semaphore _evt;
  290. private bool _stopped;
  291. private ManualResetEvent _stop;
  292. #endif
  293. private Queue<System.Reactive.TimeInterval<TSource>> _queue;
  294. private bool _hasCompleted;
  295. private TimeSpan _completeAt;
  296. private bool _hasFailed;
  297. private Exception _exception;
  298. public IDisposable Run()
  299. {
  300. _cancelable = new SerialDisposable();
  301. _gate = new object();
  302. #if !NO_CDS
  303. _evt = new SemaphoreSlim(0);
  304. #else
  305. _evt = new Semaphore(0, int.MaxValue);
  306. #endif
  307. _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
  308. _hasCompleted = false;
  309. _completeAt = default(TimeSpan);
  310. _hasFailed = false;
  311. _exception = default(Exception);
  312. _watch = _parent._scheduler.StartStopwatch();
  313. if (_parent._dueTimeA.HasValue)
  314. {
  315. var dueTimeA = _parent._dueTimeA.Value;
  316. _cancelable.Disposable = _parent._scheduler.Schedule(dueTimeA, Start);
  317. }
  318. else
  319. {
  320. var dueTimeR = _parent._dueTimeR.Value;
  321. _delay = Scheduler.Normalize(dueTimeR);
  322. ScheduleDrain();
  323. }
  324. var sourceSubscription = new SingleAssignmentDisposable();
  325. _sourceSubscription = sourceSubscription;
  326. sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  327. return new CompositeDisposable(_sourceSubscription, _cancelable);
  328. }
  329. private void Start()
  330. {
  331. lock (_gate)
  332. {
  333. _delay = _watch.Elapsed;
  334. var oldQueue = _queue;
  335. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  336. while (oldQueue.Count > 0)
  337. {
  338. var item = oldQueue.Dequeue();
  339. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  340. }
  341. }
  342. ScheduleDrain();
  343. }
  344. private void ScheduleDrain()
  345. {
  346. #if !NO_CDS
  347. _stop = new CancellationTokenSource();
  348. _cancelable.Disposable = Disposable.Create(() => _stop.Cancel());
  349. #else
  350. _stop = new ManualResetEvent(false);
  351. _cancelable.Disposable = Disposable.Create(() =>
  352. {
  353. _stopped = true;
  354. _stop.Set();
  355. _evt.Release();
  356. });
  357. #endif
  358. _parent._scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
  359. }
  360. public void OnNext(TSource value)
  361. {
  362. var next = _watch.Elapsed.Add(_delay);
  363. lock (_gate)
  364. {
  365. _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
  366. _evt.Release();
  367. }
  368. }
  369. public void OnError(Exception error)
  370. {
  371. _sourceSubscription.Dispose();
  372. lock (_gate)
  373. {
  374. _queue.Clear();
  375. _exception = error;
  376. _hasFailed = true;
  377. _evt.Release();
  378. }
  379. }
  380. public void OnCompleted()
  381. {
  382. _sourceSubscription.Dispose();
  383. var next = _watch.Elapsed.Add(_delay);
  384. lock (_gate)
  385. {
  386. _completeAt = next;
  387. _hasCompleted = true;
  388. _evt.Release();
  389. }
  390. }
  391. private void DrainQueue(ICancelable cancel)
  392. {
  393. while (true)
  394. {
  395. #if !NO_CDS
  396. try
  397. {
  398. _evt.Wait(_stop.Token);
  399. }
  400. catch (OperationCanceledException)
  401. {
  402. return;
  403. }
  404. #else
  405. _evt.WaitOne();
  406. if (_stopped)
  407. return;
  408. #endif
  409. var hasFailed = false;
  410. var error = default(Exception);
  411. var hasValue = false;
  412. var value = default(TSource);
  413. var hasCompleted = false;
  414. var shouldWait = false;
  415. var waitTime = default(TimeSpan);
  416. lock (_gate)
  417. {
  418. if (_hasFailed)
  419. {
  420. error = _exception;
  421. hasFailed = true;
  422. }
  423. else
  424. {
  425. var now = _watch.Elapsed;
  426. if (_queue.Count > 0)
  427. {
  428. var next = _queue.Dequeue();
  429. hasValue = true;
  430. value = next.Value;
  431. var nextDue = next.Interval;
  432. if (nextDue.CompareTo(now) > 0)
  433. {
  434. shouldWait = true;
  435. waitTime = Scheduler.Normalize(nextDue.Subtract(now));
  436. }
  437. }
  438. else if (_hasCompleted)
  439. {
  440. hasCompleted = true;
  441. if (_completeAt.CompareTo(now) > 0)
  442. {
  443. shouldWait = true;
  444. waitTime = Scheduler.Normalize(_completeAt.Subtract(now));
  445. }
  446. }
  447. }
  448. } /* lock (_gate) */
  449. if (shouldWait)
  450. {
  451. #if !NO_CDS
  452. var timer = new ManualResetEventSlim();
  453. _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
  454. try
  455. {
  456. timer.Wait(_stop.Token);
  457. }
  458. catch (OperationCanceledException)
  459. {
  460. return;
  461. }
  462. #else
  463. var timer = new ManualResetEvent(false);
  464. _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
  465. if (WaitHandle.WaitAny(new[] { timer, _stop }) == 1)
  466. return;
  467. #endif
  468. }
  469. if (hasValue)
  470. {
  471. base._observer.OnNext(value);
  472. }
  473. else
  474. {
  475. if (hasCompleted)
  476. {
  477. base._observer.OnCompleted();
  478. base.Dispose();
  479. }
  480. else if (hasFailed)
  481. {
  482. base._observer.OnError(error);
  483. base.Dispose();
  484. }
  485. return;
  486. }
  487. }
  488. }
  489. }
  490. }
  491. class Delay<TSource, TDelay> : Producer<TSource>
  492. {
  493. private readonly IObservable<TSource> _source;
  494. private readonly IObservable<TDelay> _subscriptionDelay;
  495. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  496. public Delay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)
  497. {
  498. _source = source;
  499. _subscriptionDelay = subscriptionDelay;
  500. _delaySelector = delaySelector;
  501. }
  502. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  503. {
  504. var sink = new _(this, observer, cancel);
  505. setSink(sink);
  506. return sink.Run();
  507. }
  508. class _ : Sink<TSource>, IObserver<TSource>
  509. {
  510. private readonly Delay<TSource, TDelay> _parent;
  511. public _(Delay<TSource, TDelay> parent, IObserver<TSource> observer, IDisposable cancel)
  512. : base(observer, cancel)
  513. {
  514. _parent = parent;
  515. }
  516. private CompositeDisposable _delays;
  517. private object _gate;
  518. private bool _atEnd;
  519. private SerialDisposable _subscription;
  520. public IDisposable Run()
  521. {
  522. _delays = new CompositeDisposable();
  523. _gate = new object();
  524. _atEnd = false;
  525. _subscription = new SerialDisposable();
  526. if (_parent._subscriptionDelay == null)
  527. {
  528. Start();
  529. }
  530. else
  531. {
  532. _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new σ(this));
  533. }
  534. return new CompositeDisposable(_subscription, _delays);
  535. }
  536. private void Start()
  537. {
  538. _subscription.Disposable = _parent._source.SubscribeSafe(this);
  539. }
  540. public void OnNext(TSource value)
  541. {
  542. var delay = default(IObservable<TDelay>);
  543. try
  544. {
  545. delay = _parent._delaySelector(value);
  546. }
  547. catch (Exception error)
  548. {
  549. lock (_gate)
  550. {
  551. base._observer.OnError(error);
  552. base.Dispose();
  553. }
  554. return;
  555. }
  556. var d = new SingleAssignmentDisposable();
  557. _delays.Add(d);
  558. d.Disposable = delay.SubscribeSafe(new δ(this, value, d));
  559. }
  560. public void OnError(Exception error)
  561. {
  562. lock (_gate)
  563. {
  564. base._observer.OnError(error);
  565. base.Dispose();
  566. }
  567. }
  568. public void OnCompleted()
  569. {
  570. lock (_gate)
  571. {
  572. _atEnd = true;
  573. _subscription.Dispose();
  574. CheckDone();
  575. }
  576. }
  577. private void CheckDone()
  578. {
  579. if (_atEnd && _delays.Count == 0)
  580. {
  581. base._observer.OnCompleted();
  582. base.Dispose();
  583. }
  584. }
  585. class σ : IObserver<TDelay>
  586. {
  587. private readonly _ _parent;
  588. public σ(_ parent)
  589. {
  590. _parent = parent;
  591. }
  592. public void OnNext(TDelay value)
  593. {
  594. _parent.Start();
  595. }
  596. public void OnError(Exception error)
  597. {
  598. _parent._observer.OnError(error);
  599. _parent.Dispose();
  600. }
  601. public void OnCompleted()
  602. {
  603. _parent.Start();
  604. }
  605. }
  606. class δ : IObserver<TDelay>
  607. {
  608. private readonly _ _parent;
  609. private readonly TSource _value;
  610. private readonly IDisposable _self;
  611. public δ(_ parent, TSource value, IDisposable self)
  612. {
  613. _parent = parent;
  614. _value = value;
  615. _self = self;
  616. }
  617. public void OnNext(TDelay value)
  618. {
  619. lock (_parent._gate)
  620. {
  621. _parent._observer.OnNext(_value);
  622. _parent._delays.Remove(_self);
  623. _parent.CheckDone();
  624. }
  625. }
  626. public void OnError(Exception error)
  627. {
  628. lock (_parent._gate)
  629. {
  630. _parent._observer.OnError(error);
  631. _parent.Dispose();
  632. }
  633. }
  634. public void OnCompleted()
  635. {
  636. lock (_parent._gate)
  637. {
  638. _parent._observer.OnNext(_value);
  639. _parent._delays.Remove(_self);
  640. _parent.CheckDone();
  641. }
  642. }
  643. }
  644. }
  645. }
  646. }
  647. #endif