Delay.cs 29 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. internal static class Delay<TSource>
  11. {
  12. internal abstract class Base<TParent> : Producer<TSource, Base<TParent>._>
  13. where TParent : Base<TParent>
  14. {
  15. protected readonly IObservable<TSource> _source;
  16. protected readonly IScheduler _scheduler;
  17. public Base(IObservable<TSource> source, IScheduler scheduler)
  18. {
  19. _source = source;
  20. _scheduler = scheduler;
  21. }
  22. internal abstract class _ : IdentitySink<TSource>
  23. {
  24. public _(IObserver<TSource> observer)
  25. : base(observer)
  26. {
  27. }
  28. public abstract void Run(TParent parent);
  29. }
  30. internal abstract class S : _
  31. {
  32. protected readonly object _gate = new object();
  33. protected IDisposable _cancelable;
  34. protected readonly IScheduler _scheduler;
  35. public S(TParent parent, IObserver<TSource> observer)
  36. : base(observer)
  37. {
  38. _scheduler = parent._scheduler;
  39. }
  40. protected IStopwatch _watch;
  41. protected TimeSpan _delay;
  42. protected bool _ready;
  43. protected bool _active;
  44. protected bool _running;
  45. protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<Reactive.TimeInterval<TSource>>();
  46. private bool _hasCompleted;
  47. private TimeSpan _completeAt;
  48. private bool _hasFailed;
  49. private Exception _exception;
  50. public override void Run(TParent parent)
  51. {
  52. _watch = _scheduler.StartStopwatch();
  53. RunCore(parent);
  54. base.Run(parent._source);
  55. }
  56. protected override void Dispose(bool disposing)
  57. {
  58. base.Dispose(disposing);
  59. if (disposing)
  60. {
  61. Disposable.TryDispose(ref _cancelable);
  62. }
  63. }
  64. protected abstract void RunCore(TParent parent);
  65. public override void OnNext(TSource value)
  66. {
  67. var shouldRun = false;
  68. lock (_gate)
  69. {
  70. var next = _watch.Elapsed.Add(_delay);
  71. _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
  72. shouldRun = _ready && !_active;
  73. _active = true;
  74. }
  75. if (shouldRun)
  76. {
  77. Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(this, _delay, (@this, a) => @this.DrainQueue(a)));
  78. }
  79. }
  80. public override void OnError(Exception error)
  81. {
  82. DisposeUpstream();
  83. var shouldRun = false;
  84. lock (_gate)
  85. {
  86. _queue.Clear();
  87. _exception = error;
  88. _hasFailed = true;
  89. shouldRun = !_running;
  90. }
  91. if (shouldRun)
  92. {
  93. ForwardOnError(error);
  94. }
  95. }
  96. public override void OnCompleted()
  97. {
  98. DisposeUpstream();
  99. var shouldRun = false;
  100. lock (_gate)
  101. {
  102. var next = _watch.Elapsed.Add(_delay);
  103. _completeAt = next;
  104. _hasCompleted = true;
  105. shouldRun = _ready && !_active;
  106. _active = true;
  107. }
  108. if (shouldRun)
  109. {
  110. Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(this, _delay, (@this, a) => @this.DrainQueue(a)));
  111. }
  112. }
  113. protected void DrainQueue(Action<S, TimeSpan> recurse)
  114. {
  115. lock (_gate)
  116. {
  117. if (_hasFailed)
  118. return;
  119. _running = true;
  120. }
  121. //
  122. // The shouldYield flag was added to address TFS 487881: "Delay can be unfair". In the old
  123. // implementation, the loop below kept running while there was work for immediate dispatch,
  124. // potentially causing a long running work item on the target scheduler. With the addition
  125. // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
  126. // interface and perform different processing (see LongRunningImpl). To reduce the code
  127. // churn in the old loop code here, we set the shouldYield flag to true after the first
  128. // dispatch iteration, in order to break from the loop and enter the recursive scheduling path.
  129. //
  130. var shouldYield = false;
  131. while (true)
  132. {
  133. var hasFailed = false;
  134. var error = default(Exception);
  135. var hasValue = false;
  136. var value = default(TSource);
  137. var hasCompleted = false;
  138. var shouldRecurse = false;
  139. var recurseDueTime = default(TimeSpan);
  140. lock (_gate)
  141. {
  142. if (_hasFailed)
  143. {
  144. error = _exception;
  145. hasFailed = true;
  146. _running = false;
  147. }
  148. else
  149. {
  150. var now = _watch.Elapsed;
  151. if (_queue.Count > 0)
  152. {
  153. var nextDue = _queue.Peek().Interval;
  154. if (nextDue.CompareTo(now) <= 0 && !shouldYield)
  155. {
  156. value = _queue.Dequeue().Value;
  157. hasValue = true;
  158. }
  159. else
  160. {
  161. shouldRecurse = true;
  162. recurseDueTime = Scheduler.Normalize(nextDue.Subtract(now));
  163. _running = false;
  164. }
  165. }
  166. else if (_hasCompleted)
  167. {
  168. if (_completeAt.CompareTo(now) <= 0 && !shouldYield)
  169. {
  170. hasCompleted = true;
  171. }
  172. else
  173. {
  174. shouldRecurse = true;
  175. recurseDueTime = Scheduler.Normalize(_completeAt.Subtract(now));
  176. _running = false;
  177. }
  178. }
  179. else
  180. {
  181. _running = false;
  182. _active = false;
  183. }
  184. }
  185. } /* lock (_gate) */
  186. if (hasValue)
  187. {
  188. ForwardOnNext(value);
  189. shouldYield = true;
  190. }
  191. else
  192. {
  193. if (hasCompleted)
  194. {
  195. ForwardOnCompleted();
  196. }
  197. else if (hasFailed)
  198. {
  199. ForwardOnError(error);
  200. }
  201. else if (shouldRecurse)
  202. {
  203. recurse(this, recurseDueTime);
  204. }
  205. return;
  206. }
  207. } /* while (true) */
  208. }
  209. }
  210. protected abstract class L : _
  211. {
  212. protected readonly object _gate = new object();
  213. protected IDisposable _cancelable;
  214. private readonly SemaphoreSlim _evt = new SemaphoreSlim(0);
  215. private readonly IScheduler _scheduler;
  216. public L(TParent parent, IObserver<TSource> observer)
  217. : base(observer)
  218. {
  219. _scheduler = parent._scheduler;
  220. }
  221. protected IStopwatch _watch;
  222. protected TimeSpan _delay;
  223. protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
  224. private CancellationTokenSource _stop;
  225. private bool _hasCompleted;
  226. private TimeSpan _completeAt;
  227. private bool _hasFailed;
  228. private Exception _exception;
  229. public override void Run(TParent parent)
  230. {
  231. _watch = _scheduler.StartStopwatch();
  232. RunCore(parent);
  233. base.Run(parent._source);
  234. }
  235. protected override void Dispose(bool disposing)
  236. {
  237. base.Dispose(disposing);
  238. if (disposing)
  239. {
  240. Disposable.TryDispose(ref _cancelable);
  241. }
  242. }
  243. protected abstract void RunCore(TParent parent);
  244. protected void ScheduleDrain()
  245. {
  246. _stop = new CancellationTokenSource();
  247. Disposable.TrySetSerial(ref _cancelable, new CancellationDisposable(_stop));
  248. _scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
  249. }
  250. public override void OnNext(TSource value)
  251. {
  252. lock (_gate)
  253. {
  254. var next = _watch.Elapsed.Add(_delay);
  255. _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
  256. _evt.Release();
  257. }
  258. }
  259. public override void OnError(Exception error)
  260. {
  261. DisposeUpstream();
  262. lock (_gate)
  263. {
  264. _queue.Clear();
  265. _exception = error;
  266. _hasFailed = true;
  267. _evt.Release();
  268. }
  269. }
  270. public override void OnCompleted()
  271. {
  272. DisposeUpstream();
  273. lock (_gate)
  274. {
  275. var next = _watch.Elapsed.Add(_delay);
  276. _completeAt = next;
  277. _hasCompleted = true;
  278. _evt.Release();
  279. }
  280. }
  281. private void DrainQueue(ICancelable cancel)
  282. {
  283. while (true)
  284. {
  285. try
  286. {
  287. _evt.Wait(_stop.Token);
  288. }
  289. catch (OperationCanceledException)
  290. {
  291. return;
  292. }
  293. var hasFailed = false;
  294. var error = default(Exception);
  295. var hasValue = false;
  296. var value = default(TSource);
  297. var hasCompleted = false;
  298. var shouldWait = false;
  299. var waitTime = default(TimeSpan);
  300. lock (_gate)
  301. {
  302. if (_hasFailed)
  303. {
  304. error = _exception;
  305. hasFailed = true;
  306. }
  307. else
  308. {
  309. var now = _watch.Elapsed;
  310. if (_queue.Count > 0)
  311. {
  312. var next = _queue.Dequeue();
  313. hasValue = true;
  314. value = next.Value;
  315. var nextDue = next.Interval;
  316. if (nextDue.CompareTo(now) > 0)
  317. {
  318. shouldWait = true;
  319. waitTime = Scheduler.Normalize(nextDue.Subtract(now));
  320. }
  321. }
  322. else if (_hasCompleted)
  323. {
  324. hasCompleted = true;
  325. if (_completeAt.CompareTo(now) > 0)
  326. {
  327. shouldWait = true;
  328. waitTime = Scheduler.Normalize(_completeAt.Subtract(now));
  329. }
  330. }
  331. }
  332. } /* lock (_gate) */
  333. if (shouldWait)
  334. {
  335. var timer = new ManualResetEventSlim();
  336. _scheduler.Schedule(timer, waitTime, (_, slimTimer) => { slimTimer.Set(); return Disposable.Empty; });
  337. try
  338. {
  339. timer.Wait(_stop.Token);
  340. }
  341. catch (OperationCanceledException)
  342. {
  343. return;
  344. }
  345. }
  346. if (hasValue)
  347. {
  348. ForwardOnNext(value);
  349. }
  350. else
  351. {
  352. if (hasCompleted)
  353. {
  354. ForwardOnCompleted();
  355. }
  356. else if (hasFailed)
  357. {
  358. ForwardOnError(error);
  359. }
  360. return;
  361. }
  362. }
  363. }
  364. }
  365. }
  366. internal sealed class Absolute : Base<Absolute>
  367. {
  368. private readonly DateTimeOffset _dueTime;
  369. public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  370. : base(source, scheduler)
  371. {
  372. _dueTime = dueTime;
  373. }
  374. protected override _ CreateSink(IObserver<TSource> observer) => _scheduler.AsLongRunning() != null ? (Base<Absolute>._)new L(this, observer) : new S(this, observer);
  375. protected override void Run(_ sink) => sink.Run(this);
  376. new private sealed class S : Base<Absolute>.S
  377. {
  378. public S(Absolute parent, IObserver<TSource> observer)
  379. : base(parent, observer)
  380. {
  381. }
  382. protected override void RunCore(Absolute parent)
  383. {
  384. _ready = false;
  385. Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
  386. }
  387. private IDisposable Start()
  388. {
  389. var next = default(TimeSpan);
  390. var shouldRun = false;
  391. lock (_gate)
  392. {
  393. _delay = _watch.Elapsed;
  394. var oldQueue = _queue;
  395. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  396. if (oldQueue.Count > 0)
  397. {
  398. next = oldQueue.Peek().Interval;
  399. while (oldQueue.Count > 0)
  400. {
  401. var item = oldQueue.Dequeue();
  402. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  403. }
  404. shouldRun = true;
  405. _active = true;
  406. }
  407. _ready = true;
  408. }
  409. if (shouldRun)
  410. {
  411. Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule((Base<Absolute>.S)this, next, (@this, a) => DrainQueue(a)));
  412. }
  413. return Disposable.Empty;
  414. }
  415. }
  416. new private sealed class L : Base<Absolute>.L
  417. {
  418. public L(Absolute parent, IObserver<TSource> observer)
  419. : base(parent, observer)
  420. {
  421. }
  422. protected override void RunCore(Absolute parent)
  423. {
  424. // ScheduleDrain might have already set a newer disposable
  425. // using TrySetSerial would cancel it, stopping the emission
  426. // and hang the consumer
  427. Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
  428. }
  429. private IDisposable Start()
  430. {
  431. lock (_gate)
  432. {
  433. _delay = _watch.Elapsed;
  434. var oldQueue = _queue;
  435. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  436. while (oldQueue.Count > 0)
  437. {
  438. var item = oldQueue.Dequeue();
  439. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  440. }
  441. }
  442. ScheduleDrain();
  443. return Disposable.Empty;
  444. }
  445. }
  446. }
  447. internal sealed class Relative : Base<Relative>
  448. {
  449. private readonly TimeSpan _dueTime;
  450. public Relative(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  451. : base(source, scheduler)
  452. {
  453. _dueTime = dueTime;
  454. }
  455. protected override _ CreateSink(IObserver<TSource> observer) => _scheduler.AsLongRunning() != null ? (Base<Relative>._)new L(this, observer) : new S(this, observer);
  456. protected override void Run(_ sink) => sink.Run(this);
  457. new private sealed class S : Base<Relative>.S
  458. {
  459. public S(Relative parent, IObserver<TSource> observer)
  460. : base(parent, observer)
  461. {
  462. }
  463. protected override void RunCore(Relative parent)
  464. {
  465. _ready = true;
  466. _delay = Scheduler.Normalize(parent._dueTime);
  467. }
  468. }
  469. new private sealed class L : Base<Relative>.L
  470. {
  471. public L(Relative parent, IObserver<TSource> observer)
  472. : base(parent, observer)
  473. {
  474. }
  475. protected override void RunCore(Relative parent)
  476. {
  477. _delay = Scheduler.Normalize(parent._dueTime);
  478. ScheduleDrain();
  479. }
  480. }
  481. }
  482. }
  483. internal static class Delay<TSource, TDelay>
  484. {
  485. internal abstract class Base<TParent> : Producer<TSource, Base<TParent>._>
  486. where TParent : Base<TParent>
  487. {
  488. protected readonly IObservable<TSource> _source;
  489. public Base(IObservable<TSource> source)
  490. {
  491. _source = source;
  492. }
  493. internal abstract class _ : IdentitySink<TSource>
  494. {
  495. private readonly CompositeDisposable _delays = new CompositeDisposable();
  496. private object _gate = new object();
  497. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  498. public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer)
  499. : base(observer)
  500. {
  501. _delaySelector = delaySelector;
  502. }
  503. private bool _atEnd;
  504. private IDisposable _subscription;
  505. public void Run(TParent parent)
  506. {
  507. _atEnd = false;
  508. Disposable.SetSingle(ref _subscription, RunCore(parent));
  509. }
  510. protected override void Dispose(bool disposing)
  511. {
  512. if (disposing)
  513. {
  514. Disposable.TryDispose(ref _subscription);
  515. _delays.Dispose();
  516. }
  517. base.Dispose(disposing);
  518. }
  519. protected abstract IDisposable RunCore(TParent parent);
  520. public override void OnNext(TSource value)
  521. {
  522. var delay = default(IObservable<TDelay>);
  523. try
  524. {
  525. delay = _delaySelector(value);
  526. }
  527. catch (Exception error)
  528. {
  529. lock (_gate)
  530. {
  531. ForwardOnError(error);
  532. }
  533. return;
  534. }
  535. var observer = new DelayObserver(this, value);
  536. _delays.Add(observer);
  537. observer.SetResource(delay.SubscribeSafe(observer));
  538. }
  539. public override void OnError(Exception error)
  540. {
  541. lock (_gate)
  542. {
  543. ForwardOnError(error);
  544. }
  545. }
  546. public override void OnCompleted()
  547. {
  548. lock (_gate)
  549. {
  550. _atEnd = true;
  551. _subscription.Dispose();
  552. CheckDone();
  553. }
  554. }
  555. private void CheckDone()
  556. {
  557. if (_atEnd && _delays.Count == 0)
  558. {
  559. ForwardOnCompleted();
  560. }
  561. }
  562. private sealed class DelayObserver : SafeObserver<TDelay>
  563. {
  564. private readonly _ _parent;
  565. private readonly TSource _value;
  566. public DelayObserver(_ parent, TSource value)
  567. {
  568. _parent = parent;
  569. _value = value;
  570. }
  571. public override void OnNext(TDelay value)
  572. {
  573. lock (_parent._gate)
  574. {
  575. _parent.ForwardOnNext(_value);
  576. _parent._delays.Remove(this);
  577. _parent.CheckDone();
  578. }
  579. }
  580. public override void OnError(Exception error)
  581. {
  582. lock (_parent._gate)
  583. {
  584. _parent.ForwardOnError(error);
  585. }
  586. }
  587. public override void OnCompleted()
  588. {
  589. lock (_parent._gate)
  590. {
  591. _parent.ForwardOnNext(_value);
  592. _parent._delays.Remove(this);
  593. _parent.CheckDone();
  594. }
  595. }
  596. }
  597. }
  598. }
  599. internal class Selector : Base<Selector>
  600. {
  601. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  602. public Selector(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delaySelector)
  603. : base(source)
  604. {
  605. _delaySelector = delaySelector;
  606. }
  607. protected override Base<Selector>._ CreateSink(IObserver<TSource> observer) => new _(_delaySelector, observer);
  608. protected override void Run(Base<Selector>._ sink) => sink.Run(this);
  609. new private sealed class _ : Base<Selector>._
  610. {
  611. public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer)
  612. : base(delaySelector, observer)
  613. {
  614. }
  615. protected override IDisposable RunCore(Selector parent) => parent._source.SubscribeSafe(this);
  616. }
  617. }
  618. internal sealed class SelectorWithSubscriptionDelay : Base<SelectorWithSubscriptionDelay>
  619. {
  620. private readonly IObservable<TDelay> _subscriptionDelay;
  621. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  622. public SelectorWithSubscriptionDelay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)
  623. : base(source)
  624. {
  625. _subscriptionDelay = subscriptionDelay;
  626. _delaySelector = delaySelector;
  627. }
  628. protected override Base<SelectorWithSubscriptionDelay>._ CreateSink(IObserver<TSource> observer) => new _(_delaySelector, observer);
  629. protected override void Run(Base<SelectorWithSubscriptionDelay>._ sink) => sink.Run(this);
  630. new private sealed class _ : Base<SelectorWithSubscriptionDelay>._
  631. {
  632. public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer)
  633. : base(delaySelector, observer)
  634. {
  635. }
  636. protected override IDisposable RunCore(SelectorWithSubscriptionDelay parent)
  637. {
  638. var delayConsumer = new SubscriptionDelayObserver(this, parent._source);
  639. delayConsumer.SetFirst(parent._subscriptionDelay.SubscribeSafe(delayConsumer));
  640. return delayConsumer;
  641. }
  642. private sealed class SubscriptionDelayObserver : IObserver<TDelay>, IDisposable
  643. {
  644. private readonly _ _parent;
  645. private readonly IObservable<TSource> _source;
  646. private IDisposable _subscription;
  647. public SubscriptionDelayObserver(_ parent, IObservable<TSource> source)
  648. {
  649. _parent = parent;
  650. _source = source;
  651. }
  652. internal void SetFirst(IDisposable d)
  653. {
  654. Disposable.TrySetSingle(ref _subscription, d);
  655. }
  656. public void OnNext(TDelay value)
  657. {
  658. Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
  659. }
  660. public void OnError(Exception error)
  661. {
  662. _parent.ForwardOnError(error);
  663. }
  664. public void OnCompleted()
  665. {
  666. Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
  667. }
  668. public void Dispose()
  669. {
  670. Disposable.TryDispose(ref _subscription);
  671. }
  672. }
  673. }
  674. }
  675. }
  676. }