Delay.cs 28 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. internal static class Delay<TSource>
  11. {
  12. internal abstract class Base<TParent> : Producer<TSource, Base<TParent>._>
  13. where TParent : Base<TParent>
  14. {
  15. protected readonly IObservable<TSource> _source;
  16. protected readonly IScheduler _scheduler;
  17. protected Base(IObservable<TSource> source, IScheduler scheduler)
  18. {
  19. _source = source;
  20. _scheduler = scheduler;
  21. }
  22. internal abstract class _ : IdentitySink<TSource>
  23. {
  24. protected IStopwatch _watch;
  25. protected IScheduler _scheduler;
  26. protected _(TParent parent, IObserver<TSource> observer)
  27. : base(observer)
  28. {
  29. _scheduler = parent._scheduler;
  30. }
  31. public void Run(TParent parent)
  32. {
  33. _watch = _scheduler.StartStopwatch();
  34. RunCore(parent);
  35. base.Run(parent._source);
  36. }
  37. protected abstract void RunCore(TParent parent);
  38. }
  39. internal abstract class S : _
  40. {
  41. protected readonly object _gate = new object();
  42. protected IDisposable _cancelable;
  43. protected S(TParent parent, IObserver<TSource> observer)
  44. : base(parent, observer)
  45. {
  46. }
  47. protected TimeSpan _delay;
  48. protected bool _ready;
  49. protected bool _active;
  50. protected bool _running;
  51. protected Queue<Reactive.TimeInterval<TSource>> _queue = new Queue<Reactive.TimeInterval<TSource>>();
  52. private bool _hasCompleted;
  53. private TimeSpan _completeAt;
  54. private bool _hasFailed;
  55. private Exception _exception;
  56. protected override void Dispose(bool disposing)
  57. {
  58. base.Dispose(disposing);
  59. if (disposing)
  60. {
  61. Disposable.TryDispose(ref _cancelable);
  62. }
  63. }
  64. public override void OnNext(TSource value)
  65. {
  66. var shouldRun = false;
  67. lock (_gate)
  68. {
  69. var next = _watch.Elapsed.Add(_delay);
  70. _queue.Enqueue(new Reactive.TimeInterval<TSource>(value, next));
  71. shouldRun = _ready && !_active;
  72. _active = true;
  73. }
  74. if (shouldRun)
  75. {
  76. Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(this, _delay, (@this, a) => @this.DrainQueue(a)));
  77. }
  78. }
  79. public override void OnError(Exception error)
  80. {
  81. DisposeUpstream();
  82. var shouldRun = false;
  83. lock (_gate)
  84. {
  85. _queue.Clear();
  86. _exception = error;
  87. _hasFailed = true;
  88. shouldRun = !_running;
  89. }
  90. if (shouldRun)
  91. {
  92. ForwardOnError(error);
  93. }
  94. }
  95. public override void OnCompleted()
  96. {
  97. DisposeUpstream();
  98. var shouldRun = false;
  99. lock (_gate)
  100. {
  101. var next = _watch.Elapsed.Add(_delay);
  102. _completeAt = next;
  103. _hasCompleted = true;
  104. shouldRun = _ready && !_active;
  105. _active = true;
  106. }
  107. if (shouldRun)
  108. {
  109. Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(this, _delay, (@this, a) => @this.DrainQueue(a)));
  110. }
  111. }
  112. protected void DrainQueue(Action<S, TimeSpan> recurse)
  113. {
  114. lock (_gate)
  115. {
  116. if (_hasFailed)
  117. {
  118. return;
  119. }
  120. _running = true;
  121. }
  122. //
  123. // The shouldYield flag was added to address TFS 487881: "Delay can be unfair". In the old
  124. // implementation, the loop below kept running while there was work for immediate dispatch,
  125. // potentially causing a long running work item on the target scheduler. With the addition
  126. // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
  127. // interface and perform different processing (see LongRunningImpl). To reduce the code
  128. // churn in the old loop code here, we set the shouldYield flag to true after the first
  129. // dispatch iteration, in order to break from the loop and enter the recursive scheduling path.
  130. //
  131. var shouldYield = false;
  132. while (true)
  133. {
  134. var hasFailed = false;
  135. var error = default(Exception);
  136. var hasValue = false;
  137. var value = default(TSource);
  138. var hasCompleted = false;
  139. var shouldRecurse = false;
  140. var recurseDueTime = default(TimeSpan);
  141. lock (_gate)
  142. {
  143. if (_hasFailed)
  144. {
  145. error = _exception;
  146. hasFailed = true;
  147. _running = false;
  148. }
  149. else
  150. {
  151. var now = _watch.Elapsed;
  152. if (_queue.Count > 0)
  153. {
  154. var nextDue = _queue.Peek().Interval;
  155. if (nextDue.CompareTo(now) <= 0 && !shouldYield)
  156. {
  157. value = _queue.Dequeue().Value;
  158. hasValue = true;
  159. }
  160. else
  161. {
  162. shouldRecurse = true;
  163. recurseDueTime = Scheduler.Normalize(nextDue.Subtract(now));
  164. _running = false;
  165. }
  166. }
  167. else if (_hasCompleted)
  168. {
  169. if (_completeAt.CompareTo(now) <= 0 && !shouldYield)
  170. {
  171. hasCompleted = true;
  172. }
  173. else
  174. {
  175. shouldRecurse = true;
  176. recurseDueTime = Scheduler.Normalize(_completeAt.Subtract(now));
  177. _running = false;
  178. }
  179. }
  180. else
  181. {
  182. _running = false;
  183. _active = false;
  184. }
  185. }
  186. } /* lock (_gate) */
  187. if (hasValue)
  188. {
  189. ForwardOnNext(value);
  190. shouldYield = true;
  191. }
  192. else
  193. {
  194. if (hasCompleted)
  195. {
  196. ForwardOnCompleted();
  197. }
  198. else if (hasFailed)
  199. {
  200. ForwardOnError(error);
  201. }
  202. else if (shouldRecurse)
  203. {
  204. recurse(this, recurseDueTime);
  205. }
  206. return;
  207. }
  208. } /* while (true) */
  209. }
  210. }
  211. protected abstract class L : _
  212. {
  213. protected readonly object _gate = new object();
  214. protected IDisposable _cancelable;
  215. private readonly SemaphoreSlim _evt = new SemaphoreSlim(0);
  216. protected L(TParent parent, IObserver<TSource> observer)
  217. : base(parent, observer)
  218. {
  219. }
  220. protected TimeSpan _delay;
  221. protected Queue<Reactive.TimeInterval<TSource>> _queue = new Queue<Reactive.TimeInterval<TSource>>();
  222. private CancellationTokenSource _stop;
  223. private bool _hasCompleted;
  224. private TimeSpan _completeAt;
  225. private bool _hasFailed;
  226. private Exception _exception;
  227. protected override void Dispose(bool disposing)
  228. {
  229. base.Dispose(disposing);
  230. if (disposing)
  231. {
  232. Disposable.TryDispose(ref _cancelable);
  233. }
  234. }
  235. protected void ScheduleDrain()
  236. {
  237. _stop = new CancellationTokenSource();
  238. Disposable.TrySetSerial(ref _cancelable, new CancellationDisposable(_stop));
  239. _scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
  240. }
  241. public override void OnNext(TSource value)
  242. {
  243. lock (_gate)
  244. {
  245. var next = _watch.Elapsed.Add(_delay);
  246. _queue.Enqueue(new Reactive.TimeInterval<TSource>(value, next));
  247. _evt.Release();
  248. }
  249. }
  250. public override void OnError(Exception error)
  251. {
  252. DisposeUpstream();
  253. lock (_gate)
  254. {
  255. _queue.Clear();
  256. _exception = error;
  257. _hasFailed = true;
  258. _evt.Release();
  259. }
  260. }
  261. public override void OnCompleted()
  262. {
  263. DisposeUpstream();
  264. lock (_gate)
  265. {
  266. var next = _watch.Elapsed.Add(_delay);
  267. _completeAt = next;
  268. _hasCompleted = true;
  269. _evt.Release();
  270. }
  271. }
  272. private void DrainQueue(ICancelable cancel)
  273. {
  274. while (true)
  275. {
  276. try
  277. {
  278. _evt.Wait(_stop.Token);
  279. }
  280. catch (OperationCanceledException)
  281. {
  282. return;
  283. }
  284. var hasFailed = false;
  285. var error = default(Exception);
  286. var hasValue = false;
  287. var value = default(TSource);
  288. var hasCompleted = false;
  289. var shouldWait = false;
  290. var waitTime = default(TimeSpan);
  291. lock (_gate)
  292. {
  293. if (_hasFailed)
  294. {
  295. error = _exception;
  296. hasFailed = true;
  297. }
  298. else
  299. {
  300. var now = _watch.Elapsed;
  301. if (_queue.Count > 0)
  302. {
  303. var next = _queue.Dequeue();
  304. hasValue = true;
  305. value = next.Value;
  306. var nextDue = next.Interval;
  307. if (nextDue.CompareTo(now) > 0)
  308. {
  309. shouldWait = true;
  310. waitTime = Scheduler.Normalize(nextDue.Subtract(now));
  311. }
  312. }
  313. else if (_hasCompleted)
  314. {
  315. hasCompleted = true;
  316. if (_completeAt.CompareTo(now) > 0)
  317. {
  318. shouldWait = true;
  319. waitTime = Scheduler.Normalize(_completeAt.Subtract(now));
  320. }
  321. }
  322. }
  323. } /* lock (_gate) */
  324. if (shouldWait)
  325. {
  326. var timer = new ManualResetEventSlim();
  327. _scheduler.ScheduleAction(timer, waitTime, slimTimer => { slimTimer.Set(); });
  328. try
  329. {
  330. timer.Wait(_stop.Token);
  331. }
  332. catch (OperationCanceledException)
  333. {
  334. return;
  335. }
  336. }
  337. if (hasValue)
  338. {
  339. ForwardOnNext(value);
  340. }
  341. else
  342. {
  343. if (hasCompleted)
  344. {
  345. ForwardOnCompleted();
  346. }
  347. else if (hasFailed)
  348. {
  349. ForwardOnError(error);
  350. }
  351. return;
  352. }
  353. }
  354. }
  355. }
  356. }
  357. internal sealed class Absolute : Base<Absolute>
  358. {
  359. private readonly DateTimeOffset _dueTime;
  360. public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  361. : base(source, scheduler)
  362. {
  363. _dueTime = dueTime;
  364. }
  365. protected override _ CreateSink(IObserver<TSource> observer) => _scheduler.AsLongRunning() != null ? (_)new L(this, observer) : new S(this, observer);
  366. protected override void Run(_ sink) => sink.Run(this);
  367. private new sealed class S : Base<Absolute>.S
  368. {
  369. public S(Absolute parent, IObserver<TSource> observer)
  370. : base(parent, observer)
  371. {
  372. }
  373. protected override void RunCore(Absolute parent)
  374. {
  375. _ready = false;
  376. Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start()));
  377. }
  378. private void Start()
  379. {
  380. var next = default(TimeSpan);
  381. var shouldRun = false;
  382. lock (_gate)
  383. {
  384. _delay = _watch.Elapsed;
  385. var oldQueue = _queue;
  386. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  387. if (oldQueue.Count > 0)
  388. {
  389. next = oldQueue.Peek().Interval;
  390. while (oldQueue.Count > 0)
  391. {
  392. var item = oldQueue.Dequeue();
  393. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  394. }
  395. shouldRun = true;
  396. _active = true;
  397. }
  398. _ready = true;
  399. }
  400. if (shouldRun)
  401. {
  402. Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule((Base<Absolute>.S)this, next, (@this, a) => DrainQueue(a)));
  403. }
  404. }
  405. }
  406. private new sealed class L : Base<Absolute>.L
  407. {
  408. public L(Absolute parent, IObserver<TSource> observer)
  409. : base(parent, observer)
  410. {
  411. }
  412. protected override void RunCore(Absolute parent)
  413. {
  414. // ScheduleDrain might have already set a newer disposable
  415. // using TrySetSerial would cancel it, stopping the emission
  416. // and hang the consumer
  417. Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start()));
  418. }
  419. private void Start()
  420. {
  421. lock (_gate)
  422. {
  423. _delay = _watch.Elapsed;
  424. var oldQueue = _queue;
  425. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  426. while (oldQueue.Count > 0)
  427. {
  428. var item = oldQueue.Dequeue();
  429. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  430. }
  431. }
  432. ScheduleDrain();
  433. }
  434. }
  435. }
  436. internal sealed class Relative : Base<Relative>
  437. {
  438. private readonly TimeSpan _dueTime;
  439. public Relative(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  440. : base(source, scheduler)
  441. {
  442. _dueTime = dueTime;
  443. }
  444. protected override _ CreateSink(IObserver<TSource> observer) => _scheduler.AsLongRunning() != null ? (_)new L(this, observer) : new S(this, observer);
  445. protected override void Run(_ sink) => sink.Run(this);
  446. private new sealed class S : Base<Relative>.S
  447. {
  448. public S(Relative parent, IObserver<TSource> observer)
  449. : base(parent, observer)
  450. {
  451. }
  452. protected override void RunCore(Relative parent)
  453. {
  454. _ready = true;
  455. _delay = Scheduler.Normalize(parent._dueTime);
  456. }
  457. }
  458. private new sealed class L : Base<Relative>.L
  459. {
  460. public L(Relative parent, IObserver<TSource> observer)
  461. : base(parent, observer)
  462. {
  463. }
  464. protected override void RunCore(Relative parent)
  465. {
  466. _delay = Scheduler.Normalize(parent._dueTime);
  467. ScheduleDrain();
  468. }
  469. }
  470. }
  471. }
  472. internal static class Delay<TSource, TDelay>
  473. {
  474. internal abstract class Base<TParent> : Producer<TSource, Base<TParent>._>
  475. where TParent : Base<TParent>
  476. {
  477. protected readonly IObservable<TSource> _source;
  478. protected Base(IObservable<TSource> source)
  479. {
  480. _source = source;
  481. }
  482. internal abstract class _ : IdentitySink<TSource>
  483. {
  484. private readonly CompositeDisposable _delays = new CompositeDisposable();
  485. private object _gate = new object();
  486. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  487. protected _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer)
  488. : base(observer)
  489. {
  490. _delaySelector = delaySelector;
  491. }
  492. private bool _atEnd;
  493. private IDisposable _subscription;
  494. public void Run(TParent parent)
  495. {
  496. _atEnd = false;
  497. Disposable.SetSingle(ref _subscription, RunCore(parent));
  498. }
  499. protected override void Dispose(bool disposing)
  500. {
  501. if (disposing)
  502. {
  503. Disposable.TryDispose(ref _subscription);
  504. _delays.Dispose();
  505. }
  506. base.Dispose(disposing);
  507. }
  508. protected abstract IDisposable RunCore(TParent parent);
  509. public override void OnNext(TSource value)
  510. {
  511. var delay = default(IObservable<TDelay>);
  512. try
  513. {
  514. delay = _delaySelector(value);
  515. }
  516. catch (Exception error)
  517. {
  518. lock (_gate)
  519. {
  520. ForwardOnError(error);
  521. }
  522. return;
  523. }
  524. var observer = new DelayObserver(this, value);
  525. _delays.Add(observer);
  526. observer.SetResource(delay.SubscribeSafe(observer));
  527. }
  528. public override void OnError(Exception error)
  529. {
  530. lock (_gate)
  531. {
  532. ForwardOnError(error);
  533. }
  534. }
  535. public override void OnCompleted()
  536. {
  537. lock (_gate)
  538. {
  539. _atEnd = true;
  540. _subscription.Dispose();
  541. CheckDone();
  542. }
  543. }
  544. private void CheckDone()
  545. {
  546. if (_atEnd && _delays.Count == 0)
  547. {
  548. ForwardOnCompleted();
  549. }
  550. }
  551. private sealed class DelayObserver : SafeObserver<TDelay>
  552. {
  553. private readonly _ _parent;
  554. private readonly TSource _value;
  555. public DelayObserver(_ parent, TSource value)
  556. {
  557. _parent = parent;
  558. _value = value;
  559. }
  560. public override void OnNext(TDelay value)
  561. {
  562. lock (_parent._gate)
  563. {
  564. _parent.ForwardOnNext(_value);
  565. _parent._delays.Remove(this);
  566. _parent.CheckDone();
  567. }
  568. }
  569. public override void OnError(Exception error)
  570. {
  571. lock (_parent._gate)
  572. {
  573. _parent.ForwardOnError(error);
  574. }
  575. }
  576. public override void OnCompleted()
  577. {
  578. lock (_parent._gate)
  579. {
  580. _parent.ForwardOnNext(_value);
  581. _parent._delays.Remove(this);
  582. _parent.CheckDone();
  583. }
  584. }
  585. }
  586. }
  587. }
  588. internal class Selector : Base<Selector>
  589. {
  590. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  591. public Selector(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delaySelector)
  592. : base(source)
  593. {
  594. _delaySelector = delaySelector;
  595. }
  596. protected override Base<Selector>._ CreateSink(IObserver<TSource> observer) => new _(_delaySelector, observer);
  597. protected override void Run(Base<Selector>._ sink) => sink.Run(this);
  598. private new sealed class _ : Base<Selector>._
  599. {
  600. public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer)
  601. : base(delaySelector, observer)
  602. {
  603. }
  604. protected override IDisposable RunCore(Selector parent) => parent._source.SubscribeSafe(this);
  605. }
  606. }
  607. internal sealed class SelectorWithSubscriptionDelay : Base<SelectorWithSubscriptionDelay>
  608. {
  609. private readonly IObservable<TDelay> _subscriptionDelay;
  610. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  611. public SelectorWithSubscriptionDelay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)
  612. : base(source)
  613. {
  614. _subscriptionDelay = subscriptionDelay;
  615. _delaySelector = delaySelector;
  616. }
  617. protected override Base<SelectorWithSubscriptionDelay>._ CreateSink(IObserver<TSource> observer) => new _(_delaySelector, observer);
  618. protected override void Run(Base<SelectorWithSubscriptionDelay>._ sink) => sink.Run(this);
  619. private new sealed class _ : Base<SelectorWithSubscriptionDelay>._
  620. {
  621. public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer)
  622. : base(delaySelector, observer)
  623. {
  624. }
  625. protected override IDisposable RunCore(SelectorWithSubscriptionDelay parent)
  626. {
  627. var delayConsumer = new SubscriptionDelayObserver(this, parent._source);
  628. delayConsumer.SetFirst(parent._subscriptionDelay.SubscribeSafe(delayConsumer));
  629. return delayConsumer;
  630. }
  631. private sealed class SubscriptionDelayObserver : IObserver<TDelay>, IDisposable
  632. {
  633. private readonly _ _parent;
  634. private readonly IObservable<TSource> _source;
  635. private IDisposable _subscription;
  636. public SubscriptionDelayObserver(_ parent, IObservable<TSource> source)
  637. {
  638. _parent = parent;
  639. _source = source;
  640. }
  641. internal void SetFirst(IDisposable d)
  642. {
  643. Disposable.TrySetSingle(ref _subscription, d);
  644. }
  645. public void OnNext(TDelay value)
  646. {
  647. Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
  648. }
  649. public void OnError(Exception error)
  650. {
  651. _parent.ForwardOnError(error);
  652. }
  653. public void OnCompleted()
  654. {
  655. Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
  656. }
  657. public void Dispose()
  658. {
  659. Disposable.TryDispose(ref _subscription);
  660. }
  661. }
  662. }
  663. }
  664. }
  665. }