Delay.cs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. internal static class Delay<TSource>
  11. {
  12. internal abstract class Base<TParent> : Producer<TSource, Base<TParent>._>
  13. where TParent : Base<TParent>
  14. {
  15. protected readonly IObservable<TSource> _source;
  16. protected readonly IScheduler _scheduler;
  17. protected Base(IObservable<TSource> source, IScheduler scheduler)
  18. {
  19. _source = source;
  20. _scheduler = scheduler;
  21. }
  22. internal abstract class _ : IdentitySink<TSource>
  23. {
  24. protected readonly IScheduler _scheduler;
  25. private IStopwatch? _watch;
  26. protected _(TParent parent, IObserver<TSource> observer)
  27. : base(observer)
  28. {
  29. _scheduler = parent._scheduler;
  30. }
  31. public void Run(TParent parent)
  32. {
  33. _watch = _scheduler.StartStopwatch();
  34. RunCore(parent);
  35. base.Run(parent._source);
  36. }
  37. protected TimeSpan Elapsed => _watch!.Elapsed; // NB: Only used after Run is called.
  38. protected abstract void RunCore(TParent parent);
  39. }
  40. internal abstract class S : _
  41. {
  42. protected readonly object _gate = new object();
  43. protected SerialDisposableValue _cancelable;
  44. protected S(TParent parent, IObserver<TSource> observer)
  45. : base(parent, observer)
  46. {
  47. }
  48. protected TimeSpan _delay;
  49. protected bool _ready;
  50. protected bool _active;
  51. protected bool _running;
  52. protected Queue<Reactive.TimeInterval<TSource>> _queue = new Queue<Reactive.TimeInterval<TSource>>();
  53. private bool _hasCompleted;
  54. private TimeSpan _completeAt;
  55. private bool _hasFailed;
  56. private Exception? _exception;
  57. protected override void Dispose(bool disposing)
  58. {
  59. base.Dispose(disposing);
  60. if (disposing)
  61. {
  62. _cancelable.Dispose();
  63. }
  64. }
  65. public override void OnNext(TSource value)
  66. {
  67. var shouldRun = false;
  68. lock (_gate)
  69. {
  70. var next = Elapsed.Add(_delay);
  71. _queue.Enqueue(new Reactive.TimeInterval<TSource>(value, next));
  72. shouldRun = _ready && !_active;
  73. _active = true;
  74. }
  75. if (shouldRun)
  76. {
  77. DrainQueue(_delay);
  78. }
  79. }
  80. public override void OnError(Exception error)
  81. {
  82. DisposeUpstream();
  83. var shouldRun = false;
  84. lock (_gate)
  85. {
  86. _queue.Clear();
  87. _exception = error;
  88. _hasFailed = true;
  89. shouldRun = !_running;
  90. }
  91. if (shouldRun)
  92. {
  93. ForwardOnError(error);
  94. }
  95. }
  96. public override void OnCompleted()
  97. {
  98. DisposeUpstream();
  99. var shouldRun = false;
  100. lock (_gate)
  101. {
  102. var next = Elapsed.Add(_delay);
  103. _completeAt = next;
  104. _hasCompleted = true;
  105. shouldRun = _ready && !_active;
  106. _active = true;
  107. }
  108. if (shouldRun)
  109. {
  110. DrainQueue(_delay);
  111. }
  112. }
  113. protected void DrainQueue(TimeSpan next)
  114. {
  115. _cancelable.Disposable = _scheduler.Schedule(this, next, static (@this, a) => @this.DrainQueue(a));
  116. }
  117. private void DrainQueue(Action<S, TimeSpan> recurse)
  118. {
  119. lock (_gate)
  120. {
  121. if (_hasFailed)
  122. {
  123. return;
  124. }
  125. _running = true;
  126. }
  127. //
  128. // The shouldYield flag was added to address TFS 487881: "Delay can be unfair". In the old
  129. // implementation, the loop below kept running while there was work for immediate dispatch,
  130. // potentially causing a long running work item on the target scheduler. With the addition
  131. // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
  132. // interface and perform different processing (see LongRunningImpl). To reduce the code
  133. // churn in the old loop code here, we set the shouldYield flag to true after the first
  134. // dispatch iteration, in order to break from the loop and enter the recursive scheduling path.
  135. //
  136. var shouldYield = false;
  137. while (true)
  138. {
  139. var hasFailed = false;
  140. var error = default(Exception);
  141. var hasValue = false;
  142. var value = default(TSource);
  143. var hasCompleted = false;
  144. var shouldRecurse = false;
  145. var recurseDueTime = default(TimeSpan);
  146. lock (_gate)
  147. {
  148. if (_hasFailed)
  149. {
  150. error = _exception;
  151. hasFailed = true;
  152. _running = false;
  153. }
  154. else
  155. {
  156. var now = Elapsed;
  157. if (_queue.Count > 0)
  158. {
  159. var nextDue = _queue.Peek().Interval;
  160. if (nextDue.CompareTo(now) <= 0 && !shouldYield)
  161. {
  162. value = _queue.Dequeue().Value;
  163. hasValue = true;
  164. }
  165. else
  166. {
  167. shouldRecurse = true;
  168. recurseDueTime = Scheduler.Normalize(nextDue.Subtract(now));
  169. _running = false;
  170. }
  171. }
  172. else if (_hasCompleted)
  173. {
  174. if (_completeAt.CompareTo(now) <= 0 && !shouldYield)
  175. {
  176. hasCompleted = true;
  177. }
  178. else
  179. {
  180. shouldRecurse = true;
  181. recurseDueTime = Scheduler.Normalize(_completeAt.Subtract(now));
  182. _running = false;
  183. }
  184. }
  185. else
  186. {
  187. _running = false;
  188. _active = false;
  189. }
  190. }
  191. } /* lock (_gate) */
  192. if (hasValue)
  193. {
  194. ForwardOnNext(value!);
  195. shouldYield = true;
  196. }
  197. else
  198. {
  199. if (hasCompleted)
  200. {
  201. ForwardOnCompleted();
  202. }
  203. else if (hasFailed)
  204. {
  205. ForwardOnError(error!);
  206. }
  207. else if (shouldRecurse)
  208. {
  209. recurse(this, recurseDueTime);
  210. }
  211. return;
  212. }
  213. } /* while (true) */
  214. }
  215. }
  216. protected abstract class L : _
  217. {
  218. protected readonly object _gate = new object();
  219. private readonly SemaphoreSlim _evt = new SemaphoreSlim(0);
  220. protected L(TParent parent, IObserver<TSource> observer)
  221. : base(parent, observer)
  222. {
  223. }
  224. protected Queue<Reactive.TimeInterval<TSource>> _queue = new Queue<Reactive.TimeInterval<TSource>>();
  225. protected SerialDisposableValue _cancelable;
  226. protected TimeSpan _delay;
  227. private bool _hasCompleted;
  228. private TimeSpan _completeAt;
  229. private bool _hasFailed;
  230. private Exception? _exception;
  231. protected override void Dispose(bool disposing)
  232. {
  233. base.Dispose(disposing);
  234. if (disposing)
  235. {
  236. _cancelable.Dispose();
  237. }
  238. }
  239. protected void ScheduleDrain()
  240. {
  241. var cd = new CancellationDisposable();
  242. _cancelable.Disposable = cd;
  243. _scheduler.AsLongRunning()!.ScheduleLongRunning(cd.Token, DrainQueue); // NB: This class is only used with long-running schedulers.
  244. }
  245. public override void OnNext(TSource value)
  246. {
  247. lock (_gate)
  248. {
  249. var next = Elapsed.Add(_delay);
  250. _queue.Enqueue(new Reactive.TimeInterval<TSource>(value, next));
  251. _evt.Release();
  252. }
  253. }
  254. public override void OnError(Exception error)
  255. {
  256. DisposeUpstream();
  257. lock (_gate)
  258. {
  259. _queue.Clear();
  260. _exception = error;
  261. _hasFailed = true;
  262. _evt.Release();
  263. }
  264. }
  265. public override void OnCompleted()
  266. {
  267. DisposeUpstream();
  268. lock (_gate)
  269. {
  270. var next = Elapsed.Add(_delay);
  271. _completeAt = next;
  272. _hasCompleted = true;
  273. _evt.Release();
  274. }
  275. }
  276. private void DrainQueue(CancellationToken token, ICancelable cancel)
  277. {
  278. while (true)
  279. {
  280. try
  281. {
  282. _evt.Wait(token);
  283. }
  284. catch (OperationCanceledException)
  285. {
  286. return;
  287. }
  288. var hasFailed = false;
  289. var error = default(Exception);
  290. var hasValue = false;
  291. var value = default(TSource);
  292. var hasCompleted = false;
  293. var shouldWait = false;
  294. var waitTime = default(TimeSpan);
  295. lock (_gate)
  296. {
  297. if (_hasFailed)
  298. {
  299. error = _exception;
  300. hasFailed = true;
  301. }
  302. else
  303. {
  304. var now = Elapsed;
  305. if (_queue.Count > 0)
  306. {
  307. var next = _queue.Dequeue();
  308. hasValue = true;
  309. value = next.Value;
  310. var nextDue = next.Interval;
  311. if (nextDue.CompareTo(now) > 0)
  312. {
  313. shouldWait = true;
  314. waitTime = Scheduler.Normalize(nextDue.Subtract(now));
  315. }
  316. }
  317. else if (_hasCompleted)
  318. {
  319. hasCompleted = true;
  320. if (_completeAt.CompareTo(now) > 0)
  321. {
  322. shouldWait = true;
  323. waitTime = Scheduler.Normalize(_completeAt.Subtract(now));
  324. }
  325. }
  326. }
  327. } /* lock (_gate) */
  328. if (shouldWait)
  329. {
  330. var timer = new ManualResetEventSlim();
  331. _scheduler.ScheduleAction(timer, waitTime, static slimTimer => { slimTimer.Set(); });
  332. try
  333. {
  334. timer.Wait(token);
  335. }
  336. catch (OperationCanceledException)
  337. {
  338. return;
  339. }
  340. }
  341. if (hasValue)
  342. {
  343. ForwardOnNext(value!);
  344. }
  345. else
  346. {
  347. if (hasCompleted)
  348. {
  349. ForwardOnCompleted();
  350. }
  351. else if (hasFailed)
  352. {
  353. ForwardOnError(error!);
  354. }
  355. return;
  356. }
  357. }
  358. }
  359. }
  360. }
  361. internal sealed class Absolute : Base<Absolute>
  362. {
  363. private readonly DateTimeOffset _dueTime;
  364. public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  365. : base(source, scheduler)
  366. {
  367. _dueTime = dueTime;
  368. }
  369. protected override _ CreateSink(IObserver<TSource> observer) => _scheduler.AsLongRunning() != null ? (_)new L(this, observer) : new S(this, observer);
  370. protected override void Run(_ sink) => sink.Run(this);
  371. private new sealed class S : Base<Absolute>.S
  372. {
  373. public S(Absolute parent, IObserver<TSource> observer)
  374. : base(parent, observer)
  375. {
  376. }
  377. protected override void RunCore(Absolute parent)
  378. {
  379. _ready = false;
  380. _cancelable.TrySetFirst(parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
  381. }
  382. private void Start()
  383. {
  384. var next = default(TimeSpan);
  385. var shouldRun = false;
  386. lock (_gate)
  387. {
  388. _delay = Elapsed;
  389. var oldQueue = _queue;
  390. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  391. if (oldQueue.Count > 0)
  392. {
  393. next = oldQueue.Peek().Interval;
  394. while (oldQueue.Count > 0)
  395. {
  396. var item = oldQueue.Dequeue();
  397. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  398. }
  399. shouldRun = true;
  400. _active = true;
  401. }
  402. _ready = true;
  403. }
  404. if (shouldRun)
  405. {
  406. DrainQueue(next);
  407. }
  408. }
  409. }
  410. private new sealed class L : Base<Absolute>.L
  411. {
  412. public L(Absolute parent, IObserver<TSource> observer)
  413. : base(parent, observer)
  414. {
  415. }
  416. protected override void RunCore(Absolute parent)
  417. {
  418. // ScheduleDrain might have already set a newer disposable
  419. // using TrySetSerial would cancel it, stopping the emission
  420. // and hang the consumer
  421. _cancelable.TrySetFirst(parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
  422. }
  423. private void Start()
  424. {
  425. lock (_gate)
  426. {
  427. _delay = Elapsed;
  428. var oldQueue = _queue;
  429. _queue = new Queue<Reactive.TimeInterval<TSource>>();
  430. while (oldQueue.Count > 0)
  431. {
  432. var item = oldQueue.Dequeue();
  433. _queue.Enqueue(new Reactive.TimeInterval<TSource>(item.Value, item.Interval.Add(_delay)));
  434. }
  435. }
  436. ScheduleDrain();
  437. }
  438. }
  439. }
  440. internal sealed class Relative : Base<Relative>
  441. {
  442. private readonly TimeSpan _dueTime;
  443. public Relative(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  444. : base(source, scheduler)
  445. {
  446. _dueTime = dueTime;
  447. }
  448. protected override _ CreateSink(IObserver<TSource> observer) => _scheduler.AsLongRunning() != null ? (_)new L(this, observer) : new S(this, observer);
  449. protected override void Run(_ sink) => sink.Run(this);
  450. private new sealed class S : Base<Relative>.S
  451. {
  452. public S(Relative parent, IObserver<TSource> observer)
  453. : base(parent, observer)
  454. {
  455. }
  456. protected override void RunCore(Relative parent)
  457. {
  458. _ready = true;
  459. _delay = Scheduler.Normalize(parent._dueTime);
  460. }
  461. }
  462. private new sealed class L : Base<Relative>.L
  463. {
  464. public L(Relative parent, IObserver<TSource> observer)
  465. : base(parent, observer)
  466. {
  467. }
  468. protected override void RunCore(Relative parent)
  469. {
  470. _delay = Scheduler.Normalize(parent._dueTime);
  471. ScheduleDrain();
  472. }
  473. }
  474. }
  475. }
  476. internal static class Delay<TSource, TDelay>
  477. {
  478. internal abstract class Base<TParent> : Producer<TSource, Base<TParent>._>
  479. where TParent : Base<TParent>
  480. {
  481. protected readonly IObservable<TSource> _source;
  482. protected Base(IObservable<TSource> source)
  483. {
  484. _source = source;
  485. }
  486. internal abstract class _ : IdentitySink<TSource>
  487. {
  488. private readonly CompositeDisposable _delays = new CompositeDisposable();
  489. private readonly object _gate = new object();
  490. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  491. protected _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer)
  492. : base(observer)
  493. {
  494. _delaySelector = delaySelector;
  495. }
  496. private bool _atEnd;
  497. private SingleAssignmentDisposableValue _subscription;
  498. public void Run(TParent parent)
  499. {
  500. _atEnd = false;
  501. _subscription.Disposable = RunCore(parent);
  502. }
  503. protected override void Dispose(bool disposing)
  504. {
  505. if (disposing)
  506. {
  507. _subscription.Dispose();
  508. _delays.Dispose();
  509. }
  510. base.Dispose(disposing);
  511. }
  512. protected abstract IDisposable RunCore(TParent parent);
  513. public override void OnNext(TSource value)
  514. {
  515. IObservable<TDelay> delay;
  516. try
  517. {
  518. delay = _delaySelector(value);
  519. }
  520. catch (Exception error)
  521. {
  522. lock (_gate)
  523. {
  524. ForwardOnError(error);
  525. }
  526. return;
  527. }
  528. var observer = new DelayObserver(this, value);
  529. _delays.Add(observer);
  530. observer.SetResource(delay.SubscribeSafe(observer));
  531. }
  532. public override void OnError(Exception error)
  533. {
  534. lock (_gate)
  535. {
  536. ForwardOnError(error);
  537. }
  538. }
  539. public override void OnCompleted()
  540. {
  541. lock (_gate)
  542. {
  543. _atEnd = true;
  544. _subscription.Dispose();
  545. CheckDone();
  546. }
  547. }
  548. private void CheckDone()
  549. {
  550. if (_atEnd && _delays.Count == 0)
  551. {
  552. ForwardOnCompleted();
  553. }
  554. }
  555. private sealed class DelayObserver : SafeObserver<TDelay>
  556. {
  557. private readonly _ _parent;
  558. private readonly TSource _value;
  559. private bool _once;
  560. public DelayObserver(_ parent, TSource value)
  561. {
  562. _parent = parent;
  563. _value = value;
  564. }
  565. public override void OnNext(TDelay value)
  566. {
  567. if (!_once)
  568. {
  569. _once = true;
  570. lock (_parent._gate)
  571. {
  572. _parent.ForwardOnNext(_value);
  573. _parent._delays.Remove(this);
  574. _parent.CheckDone();
  575. }
  576. }
  577. }
  578. public override void OnError(Exception error)
  579. {
  580. lock (_parent._gate)
  581. {
  582. _parent.ForwardOnError(error);
  583. }
  584. }
  585. public override void OnCompleted()
  586. {
  587. if (!_once)
  588. {
  589. lock (_parent._gate)
  590. {
  591. _parent.ForwardOnNext(_value);
  592. _parent._delays.Remove(this);
  593. _parent.CheckDone();
  594. }
  595. }
  596. }
  597. }
  598. }
  599. }
  600. internal class Selector : Base<Selector>
  601. {
  602. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  603. public Selector(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delaySelector)
  604. : base(source)
  605. {
  606. _delaySelector = delaySelector;
  607. }
  608. protected override Base<Selector>._ CreateSink(IObserver<TSource> observer) => new _(_delaySelector, observer);
  609. protected override void Run(Base<Selector>._ sink) => sink.Run(this);
  610. private new sealed class _ : Base<Selector>._
  611. {
  612. public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer)
  613. : base(delaySelector, observer)
  614. {
  615. }
  616. protected override IDisposable RunCore(Selector parent) => parent._source.SubscribeSafe(this);
  617. }
  618. }
  619. internal sealed class SelectorWithSubscriptionDelay : Base<SelectorWithSubscriptionDelay>
  620. {
  621. private readonly IObservable<TDelay> _subscriptionDelay;
  622. private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
  623. public SelectorWithSubscriptionDelay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)
  624. : base(source)
  625. {
  626. _subscriptionDelay = subscriptionDelay;
  627. _delaySelector = delaySelector;
  628. }
  629. protected override Base<SelectorWithSubscriptionDelay>._ CreateSink(IObserver<TSource> observer) => new _(_delaySelector, observer);
  630. protected override void Run(Base<SelectorWithSubscriptionDelay>._ sink) => sink.Run(this);
  631. private new sealed class _ : Base<SelectorWithSubscriptionDelay>._
  632. {
  633. public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer)
  634. : base(delaySelector, observer)
  635. {
  636. }
  637. protected override IDisposable RunCore(SelectorWithSubscriptionDelay parent)
  638. {
  639. var delayConsumer = new SubscriptionDelayObserver(this, parent._source);
  640. delayConsumer.SetFirst(parent._subscriptionDelay.SubscribeSafe(delayConsumer));
  641. return delayConsumer;
  642. }
  643. private sealed class SubscriptionDelayObserver : IObserver<TDelay>, IDisposable
  644. {
  645. private readonly _ _parent;
  646. private readonly IObservable<TSource> _source;
  647. private SerialDisposableValue _subscription;
  648. public SubscriptionDelayObserver(_ parent, IObservable<TSource> source)
  649. {
  650. _parent = parent;
  651. _source = source;
  652. }
  653. internal void SetFirst(IDisposable d)
  654. {
  655. _subscription.TrySetFirst(d);
  656. }
  657. public void OnNext(TDelay value)
  658. {
  659. _subscription.Disposable = _source.SubscribeSafe(_parent);
  660. }
  661. public void OnError(Exception error)
  662. {
  663. _parent.ForwardOnError(error);
  664. }
  665. public void OnCompleted()
  666. {
  667. _subscription.Disposable = _source.SubscribeSafe(_parent);
  668. }
  669. public void Dispose()
  670. {
  671. _subscription.Dispose();
  672. }
  673. }
  674. }
  675. }
  676. }
  677. }