Delay.cs 24 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System.Collections.Generic;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Threading;
  9. #if NO_SEMAPHORE
  10. using System.Reactive.Threading;
  11. #endif
  12. namespace System.Reactive.Linq.ObservableImpl
  13. {
  14. class Delay<TSource> : Producer<TSource>
  15. {
  16. private readonly IObservable<TSource> _source;
  17. private readonly TimeSpan? _dueTimeR;
  18. private readonly DateTimeOffset? _dueTimeA;
  19. private readonly IScheduler _scheduler;
  20. public Delay(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  21. {
  22. _source = source;
  23. _dueTimeR = dueTime;
  24. _scheduler = scheduler;
  25. }
  26. public Delay(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  27. {
  28. _source = source;
  29. _dueTimeA = dueTime;
  30. _scheduler = scheduler;
  31. }
  32. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  33. {
  34. if (_scheduler.AsLongRunning() != null)
  35. {
  36. var sink = new LongRunningImpl(this, observer, cancel);
  37. setSink(sink);
  38. return sink.Run();
  39. }
  40. else
  41. {
  42. var sink = new _(this, observer, cancel);
  43. setSink(sink);
  44. return sink.Run();
  45. }
  46. }
  47. class _ : Sink<TSource>, IObserver<TSource>
  48. {
  49. private readonly Delay<TSource> _parent;
  50. public _(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  51. : base(observer, cancel)
  52. {
  53. _parent = parent;
  54. }
  55. private IScheduler _scheduler;
  56. private IDisposable _sourceSubscription;
  57. private SerialDisposable _cancelable;
  58. private TimeSpan _delay;
  59. private IStopwatch _watch;
  60. private object _gate;
  61. private bool _ready;
  62. private bool _active;
  63. private bool _running;
  64. private Queue<System.Reactive.TimeInterval<TSource>> _queue;
  65. private bool _hasCompleted;
  66. private TimeSpan _completeAt;
  67. private bool _hasFailed;
  68. private Exception _exception;
  69. public IDisposable Run()
  70. {
  71. _scheduler = _parent._scheduler;
  72. _cancelable = new SerialDisposable();
  73. _gate = new object();
  74. _active = false;
  75. _running = false;
  76. _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
  77. _hasCompleted = false;
  78. _completeAt = default(TimeSpan);
  79. _hasFailed = false;
  80. _exception = default(Exception);
  81. _watch = _scheduler.StartStopwatch();
  82. if (_parent._dueTimeA.HasValue)
  83. {
  84. _ready = false;
  85. var dueTimeA = _parent._dueTimeA.Value;
  86. _cancelable.Disposable = _scheduler.Schedule(dueTimeA, Start);
  87. }
  88. else
  89. {
  90. _ready = true;
  91. var dueTimeR = _parent._dueTimeR.Value;
  92. _delay = Scheduler.Normalize(dueTimeR);
  93. }
  94. var sourceSubscription = new SingleAssignmentDisposable();
  95. _sourceSubscription = sourceSubscription;
  96. sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  97. return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
  98. }
  99. private void Start()
  100. {
  101. var next = default(TimeSpan);
  102. var shouldRun = false;
  103. lock (_gate)
  104. {
  105. _delay = _watch.Elapsed;
  106. var oldQueue = _queue;
  107. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  108. if (oldQueue.Count > 0)
  109. {
  110. next = oldQueue.Peek().Interval;
  111. while (oldQueue.Count > 0)
  112. {
  113. var item = oldQueue.Dequeue();
  114. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  115. }
  116. shouldRun = true;
  117. _active = true;
  118. }
  119. _ready = true;
  120. }
  121. if (shouldRun)
  122. {
  123. _cancelable.Disposable = _scheduler.Schedule(next, DrainQueue);
  124. }
  125. }
  126. public void OnNext(TSource value)
  127. {
  128. var next = _watch.Elapsed.Add(_delay);
  129. var shouldRun = false;
  130. lock (_gate)
  131. {
  132. _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
  133. shouldRun = _ready && !_active;
  134. _active = true;
  135. }
  136. if (shouldRun)
  137. {
  138. _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
  139. }
  140. }
  141. public void OnError(Exception error)
  142. {
  143. _sourceSubscription.Dispose();
  144. var shouldRun = false;
  145. lock (_gate)
  146. {
  147. _queue.Clear();
  148. _exception = error;
  149. _hasFailed = true;
  150. shouldRun = !_running;
  151. }
  152. if (shouldRun)
  153. {
  154. base._observer.OnError(error);
  155. base.Dispose();
  156. }
  157. }
  158. public void OnCompleted()
  159. {
  160. _sourceSubscription.Dispose();
  161. var next = _watch.Elapsed.Add(_delay);
  162. var shouldRun = false;
  163. lock (_gate)
  164. {
  165. _completeAt = next;
  166. _hasCompleted = true;
  167. shouldRun = _ready && !_active;
  168. _active = true;
  169. }
  170. if (shouldRun)
  171. {
  172. _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
  173. }
  174. }
  175. private void DrainQueue(Action<TimeSpan> recurse)
  176. {
  177. lock (_gate)
  178. {
  179. if (_hasFailed)
  180. return;
  181. _running = true;
  182. }
  183. //
  184. // The shouldYield flag was added to address TFS 487881: "Delay can be unfair". In the old
  185. // implementation, the loop below kept running while there was work for immediate dispatch,
  186. // potentially causing a long running work item on the target scheduler. With the addition
  187. // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
  188. // interface and perform different processing (see LongRunningImpl). To reduce the code
  189. // churn in the old loop code here, we set the shouldYield flag to true after the first
  190. // dispatch iteration, in order to break from the loop and enter the recursive scheduling path.
  191. //
  192. var shouldYield = false;
  193. while (true)
  194. {
  195. var hasFailed = false;
  196. var error = default(Exception);
  197. var hasValue = false;
  198. var value = default(TSource);
  199. var hasCompleted = false;
  200. var shouldRecurse = false;
  201. var recurseDueTime = default(TimeSpan);
  202. lock (_gate)
  203. {
  204. if (_hasFailed)
  205. {
  206. error = _exception;
  207. hasFailed = true;
  208. _running = false;
  209. }
  210. else
  211. {
  212. var now = _watch.Elapsed;
  213. if (_queue.Count > 0)
  214. {
  215. var nextDue = _queue.Peek().Interval;
  216. if (nextDue.CompareTo(now) <= 0 && !shouldYield)
  217. {
  218. value = _queue.Dequeue().Value;
  219. hasValue = true;
  220. }
  221. else
  222. {
  223. shouldRecurse = true;
  224. recurseDueTime = Scheduler.Normalize(nextDue.Subtract(now));
  225. _running = false;
  226. }
  227. }
  228. else if (_hasCompleted)
  229. {
  230. if (_completeAt.CompareTo(now) <= 0 && !shouldYield)
  231. {
  232. hasCompleted = true;
  233. }
  234. else
  235. {
  236. shouldRecurse = true;
  237. recurseDueTime = Scheduler.Normalize(_completeAt.Subtract(now));
  238. _running = false;
  239. }
  240. }
  241. else
  242. {
  243. _running = false;
  244. _active = false;
  245. }
  246. }
  247. } /* lock (_gate) */
  248. if (hasValue)
  249. {
  250. base._observer.OnNext(value);
  251. shouldYield = true;
  252. }
  253. else
  254. {
  255. if (hasCompleted)
  256. {
  257. base._observer.OnCompleted();
  258. base.Dispose();
  259. }
  260. else if (hasFailed)
  261. {
  262. base._observer.OnError(error);
  263. base.Dispose();
  264. }
  265. else if (shouldRecurse)
  266. {
  267. recurse(recurseDueTime);
  268. }
  269. return;
  270. }
  271. } /* while (true) */
  272. }
  273. }
  274. class LongRunningImpl : Sink<TSource>, IObserver<TSource>
  275. {
  276. private readonly Delay<TSource> _parent;
  277. public LongRunningImpl(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  278. : base(observer, cancel)
  279. {
  280. _parent = parent;
  281. }
  282. private IDisposable _sourceSubscription;
  283. private SerialDisposable _cancelable;
  284. private TimeSpan _delay;
  285. private IStopwatch _watch;
  286. private object _gate;
  287. #if !NO_CDS
  288. private SemaphoreSlim _evt;
  289. private CancellationTokenSource _stop;
  290. #else
  291. private Semaphore _evt;
  292. private bool _stopped;
  293. private ManualResetEvent _stop;
  294. #endif
  295. private Queue<System.Reactive.TimeInterval<TSource>> _queue;
  296. private bool _hasCompleted;
  297. private TimeSpan _completeAt;
  298. private bool _hasFailed;
  299. private Exception _exception;
  300. public IDisposable Run()
  301. {
  302. _cancelable = new SerialDisposable();
  303. _gate = new object();
  304. #if !NO_CDS
  305. _evt = new SemaphoreSlim(0);
  306. #else
  307. _evt = new Semaphore(0, int.MaxValue);
  308. #endif
  309. _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
  310. _hasCompleted = false;
  311. _completeAt = default(TimeSpan);
  312. _hasFailed = false;
  313. _exception = default(Exception);
  314. _watch = _parent._scheduler.StartStopwatch();
  315. if (_parent._dueTimeA.HasValue)
  316. {
  317. var dueTimeA = _parent._dueTimeA.Value;
  318. _cancelable.Disposable = _parent._scheduler.Schedule(dueTimeA, Start);
  319. }
  320. else
  321. {
  322. var dueTimeR = _parent._dueTimeR.Value;
  323. _delay = Scheduler.Normalize(dueTimeR);
  324. ScheduleDrain();
  325. }
  326. var sourceSubscription = new SingleAssignmentDisposable();
  327. _sourceSubscription = sourceSubscription;
  328. sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  329. return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
  330. }
  331. private void Start()
  332. {
  333. lock (_gate)
  334. {
  335. _delay = _watch.Elapsed;
  336. var oldQueue = _queue;
  337. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  338. while (oldQueue.Count > 0)
  339. {
  340. var item = oldQueue.Dequeue();
  341. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  342. }
  343. }
  344. ScheduleDrain();
  345. }
  346. private void ScheduleDrain()
  347. {
  348. #if !NO_CDS
  349. _stop = new CancellationTokenSource();
  350. _cancelable.Disposable = Disposable.Create(() => _stop.Cancel());
  351. #else
  352. _stop = new ManualResetEvent(false);
  353. _cancelable.Disposable = Disposable.Create(() =>
  354. {
  355. _stopped = true;
  356. _stop.Set();
  357. _evt.Release();
  358. });
  359. #endif
  360. _parent._scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
  361. }
  362. public void OnNext(TSource value)
  363. {
  364. var next = _watch.Elapsed.Add(_delay);
  365. lock (_gate)
  366. {
  367. _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
  368. _evt.Release();
  369. }
  370. }
  371. public void OnError(Exception error)
  372. {
  373. _sourceSubscription.Dispose();
  374. lock (_gate)
  375. {
  376. _queue.Clear();
  377. _exception = error;
  378. _hasFailed = true;
  379. _evt.Release();
  380. }
  381. }
  382. public void OnCompleted()
  383. {
  384. _sourceSubscription.Dispose();
  385. var next = _watch.Elapsed.Add(_delay);
  386. lock (_gate)
  387. {
  388. _completeAt = next;
  389. _hasCompleted = true;
  390. _evt.Release();
  391. }
  392. }
  393. private void DrainQueue(ICancelable cancel)
  394. {
  395. while (true)
  396. {
  397. #if !NO_CDS
  398. try
  399. {
  400. _evt.Wait(_stop.Token);
  401. }
  402. catch (OperationCanceledException)
  403. {
  404. return;
  405. }
  406. #else
  407. _evt.WaitOne();
  408. if (_stopped)
  409. return;
  410. #endif
  411. var hasFailed = false;
  412. var error = default(Exception);
  413. var hasValue = false;
  414. var value = default(TSource);
  415. var hasCompleted = false;
  416. var shouldWait = false;
  417. var waitTime = default(TimeSpan);
  418. lock (_gate)
  419. {
  420. if (_hasFailed)
  421. {
  422. error = _exception;
  423. hasFailed = true;
  424. }
  425. else
  426. {
  427. var now = _watch.Elapsed;
  428. if (_queue.Count > 0)
  429. {
  430. var next = _queue.Dequeue();
  431. hasValue = true;
  432. value = next.Value;
  433. var nextDue = next.Interval;
  434. if (nextDue.CompareTo(now) > 0)
  435. {
  436. shouldWait = true;
  437. waitTime = Scheduler.Normalize(nextDue.Subtract(now));
  438. }
  439. }
  440. else if (_hasCompleted)
  441. {
  442. hasCompleted = true;
  443. if (_completeAt.CompareTo(now) > 0)
  444. {
  445. shouldWait = true;
  446. waitTime = Scheduler.Normalize(_completeAt.Subtract(now));
  447. }
  448. }
  449. }
  450. } /* lock (_gate) */
  451. if (shouldWait)
  452. {
  453. #if !NO_CDS
  454. var timer = new ManualResetEventSlim();
  455. _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
  456. try
  457. {
  458. timer.Wait(_stop.Token);
  459. }
  460. catch (OperationCanceledException)
  461. {
  462. return;
  463. }
  464. #else
  465. var timer = new ManualResetEvent(false);
  466. _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
  467. if (WaitHandle.WaitAny(new[] { timer, _stop }) == 1)
  468. return;
  469. #endif
  470. }
  471. if (hasValue)
  472. {
  473. base._observer.OnNext(value);
  474. }
  475. else
  476. {
  477. if (hasCompleted)
  478. {
  479. base._observer.OnCompleted();
  480. base.Dispose();
  481. }
  482. else if (hasFailed)
  483. {
  484. base._observer.OnError(error);
  485. base.Dispose();
  486. }
  487. return;
  488. }
  489. }
  490. }
  491. }
  492. }
  493. class Delay<TSource, TDelay> : Producer<TSource>
  494. {
  495. private readonly IObservable<TSource> _source;
  496. private readonly IObservable<TDelay> _subscriptionDelay;
  497. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  498. public Delay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)
  499. {
  500. _source = source;
  501. _subscriptionDelay = subscriptionDelay;
  502. _delaySelector = delaySelector;
  503. }
  504. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  505. {
  506. var sink = new _(this, observer, cancel);
  507. setSink(sink);
  508. return sink.Run();
  509. }
  510. class _ : Sink<TSource>, IObserver<TSource>
  511. {
  512. private readonly Delay<TSource, TDelay> _parent;
  513. public _(Delay<TSource, TDelay> parent, IObserver<TSource> observer, IDisposable cancel)
  514. : base(observer, cancel)
  515. {
  516. _parent = parent;
  517. }
  518. private CompositeDisposable _delays;
  519. private object _gate;
  520. private bool _atEnd;
  521. private SerialDisposable _subscription;
  522. public IDisposable Run()
  523. {
  524. _delays = new CompositeDisposable();
  525. _gate = new object();
  526. _atEnd = false;
  527. _subscription = new SerialDisposable();
  528. if (_parent._subscriptionDelay == null)
  529. {
  530. Start();
  531. }
  532. else
  533. {
  534. _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelay(this));
  535. }
  536. return StableCompositeDisposable.Create(_subscription, _delays);
  537. }
  538. private void Start()
  539. {
  540. _subscription.Disposable = _parent._source.SubscribeSafe(this);
  541. }
  542. public void OnNext(TSource value)
  543. {
  544. var delay = default(IObservable<TDelay>);
  545. try
  546. {
  547. delay = _parent._delaySelector(value);
  548. }
  549. catch (Exception error)
  550. {
  551. lock (_gate)
  552. {
  553. base._observer.OnError(error);
  554. base.Dispose();
  555. }
  556. return;
  557. }
  558. var d = new SingleAssignmentDisposable();
  559. _delays.Add(d);
  560. d.Disposable = delay.SubscribeSafe(new Delta(this, value, d));
  561. }
  562. public void OnError(Exception error)
  563. {
  564. lock (_gate)
  565. {
  566. base._observer.OnError(error);
  567. base.Dispose();
  568. }
  569. }
  570. public void OnCompleted()
  571. {
  572. lock (_gate)
  573. {
  574. _atEnd = true;
  575. _subscription.Dispose();
  576. CheckDone();
  577. }
  578. }
  579. private void CheckDone()
  580. {
  581. if (_atEnd && _delays.Count == 0)
  582. {
  583. base._observer.OnCompleted();
  584. base.Dispose();
  585. }
  586. }
  587. class SubscriptionDelay : IObserver<TDelay>
  588. {
  589. private readonly _ _parent;
  590. public SubscriptionDelay(_ parent)
  591. {
  592. _parent = parent;
  593. }
  594. public void OnNext(TDelay value)
  595. {
  596. _parent.Start();
  597. }
  598. public void OnError(Exception error)
  599. {
  600. _parent._observer.OnError(error);
  601. _parent.Dispose();
  602. }
  603. public void OnCompleted()
  604. {
  605. _parent.Start();
  606. }
  607. }
  608. class Delta : IObserver<TDelay>
  609. {
  610. private readonly _ _parent;
  611. private readonly TSource _value;
  612. private readonly IDisposable _self;
  613. public Delta(_ parent, TSource value, IDisposable self)
  614. {
  615. _parent = parent;
  616. _value = value;
  617. _self = self;
  618. }
  619. public void OnNext(TDelay value)
  620. {
  621. lock (_parent._gate)
  622. {
  623. _parent._observer.OnNext(_value);
  624. _parent._delays.Remove(_self);
  625. _parent.CheckDone();
  626. }
  627. }
  628. public void OnError(Exception error)
  629. {
  630. lock (_parent._gate)
  631. {
  632. _parent._observer.OnError(error);
  633. _parent.Dispose();
  634. }
  635. }
  636. public void OnCompleted()
  637. {
  638. lock (_parent._gate)
  639. {
  640. _parent._observer.OnNext(_value);
  641. _parent._delays.Remove(_self);
  642. _parent.CheckDone();
  643. }
  644. }
  645. }
  646. }
  647. }
  648. }
  649. #endif