ScheduledObserver.cs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive
  8. {
  9. using Collections.Concurrent;
  10. using Diagnostics;
  11. internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
  12. {
  13. private int _state;
  14. private const int Stopped = 0;
  15. private const int Running = 1;
  16. private const int Pending = 2;
  17. private const int Faulted = 9;
  18. private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  19. private bool _failed;
  20. private Exception? _error;
  21. private bool _completed;
  22. private readonly IObserver<T> _observer;
  23. private readonly IScheduler _scheduler;
  24. private readonly ISchedulerLongRunning? _longRunning;
  25. private IDisposable? _disposable;
  26. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  27. {
  28. _scheduler = scheduler;
  29. _observer = observer;
  30. _longRunning = _scheduler.AsLongRunning();
  31. if (_longRunning != null)
  32. {
  33. _dispatcherEvent = new SemaphoreSlim(0);
  34. _dispatcherEventRelease = new SemaphoreSlimRelease(_dispatcherEvent);
  35. }
  36. }
  37. private sealed class SemaphoreSlimRelease : IDisposable
  38. {
  39. private volatile SemaphoreSlim? _dispatcherEvent;
  40. public SemaphoreSlimRelease(SemaphoreSlim dispatcherEvent)
  41. {
  42. _dispatcherEvent = dispatcherEvent;
  43. }
  44. public void Dispose()
  45. {
  46. Interlocked.Exchange(ref _dispatcherEvent, null)?.Release();
  47. }
  48. }
  49. private readonly object _dispatcherInitGate = new object();
  50. private readonly SemaphoreSlim? _dispatcherEvent;
  51. private readonly IDisposable? _dispatcherEventRelease;
  52. private IDisposable? _dispatcherJob;
  53. private void EnsureDispatcher()
  54. {
  55. if (_dispatcherJob == null)
  56. {
  57. lock (_dispatcherInitGate)
  58. {
  59. if (_dispatcherJob == null)
  60. {
  61. _dispatcherJob = _longRunning!.ScheduleLongRunning(Dispatch); // NB: Only reachable when long-running.
  62. Disposable.TrySetSerial(ref _disposable, StableCompositeDisposable.Create
  63. (
  64. _dispatcherJob,
  65. _dispatcherEventRelease
  66. ));
  67. }
  68. }
  69. }
  70. }
  71. private void Dispatch(ICancelable cancel)
  72. {
  73. while (true)
  74. {
  75. _dispatcherEvent!.Wait(); // NB: If long-running, the event is set.
  76. if (cancel.IsDisposed)
  77. {
  78. return;
  79. }
  80. while (_queue.TryDequeue(out var next))
  81. {
  82. try
  83. {
  84. _observer.OnNext(next);
  85. }
  86. catch
  87. {
  88. while (_queue.TryDequeue(out _))
  89. {
  90. }
  91. throw;
  92. }
  93. _dispatcherEvent.Wait();
  94. if (cancel.IsDisposed)
  95. {
  96. return;
  97. }
  98. }
  99. if (_failed)
  100. {
  101. _observer.OnError(_error!);
  102. Dispose();
  103. return;
  104. }
  105. if (_completed)
  106. {
  107. _observer.OnCompleted();
  108. Dispose();
  109. return;
  110. }
  111. }
  112. }
  113. public void EnsureActive() => EnsureActive(1);
  114. public void EnsureActive(int n)
  115. {
  116. if (_longRunning != null)
  117. {
  118. if (n > 0)
  119. {
  120. _dispatcherEvent!.Release(n); // NB: If long-running, the event is set.
  121. }
  122. EnsureDispatcher();
  123. }
  124. else
  125. {
  126. EnsureActiveSlow();
  127. }
  128. }
  129. private void EnsureActiveSlow()
  130. {
  131. var isOwner = false;
  132. while (true)
  133. {
  134. var old = Interlocked.CompareExchange(ref _state, Running, Stopped);
  135. if (old == Stopped)
  136. {
  137. isOwner = true; // RUNNING
  138. break;
  139. }
  140. if (old == Faulted)
  141. {
  142. return;
  143. }
  144. // If we find the consumer loop running, we transition to PENDING to handle
  145. // the case where the queue is seen empty by the consumer, making it transition
  146. // to the STOPPED state, but we inserted an item into the queue.
  147. //
  148. // C: _queue.TryDequeue == false (RUNNING)
  149. // ----------------------------------------------
  150. // P: _queue.Enqueue(...)
  151. // EnsureActive
  152. // Exchange(ref _state, RUNNING) == RUNNING
  153. // ----------------------------------------------
  154. // C: transition to STOPPED (STOPPED)
  155. //
  156. // In this case, P would believe C is running and not invoke the scheduler
  157. // using the isOwner flag.
  158. //
  159. // By introducing an intermediate PENDING state and using CAS in the consumer
  160. // to only transition to STOPPED in case we were still RUNNING, we can force
  161. // the consumer to reconsider the decision to transition to STOPPED. In that
  162. // case, the consumer loops again and re-reads from the queue and other state
  163. // fields. At least one bit of state will have changed because EnsureActive
  164. // should only be called after invocation of IObserver<T> methods that touch
  165. // this state.
  166. //
  167. if (old == Pending || old == Running && Interlocked.CompareExchange(ref _state, Pending, Running) == Running)
  168. {
  169. break;
  170. }
  171. }
  172. if (isOwner)
  173. {
  174. Disposable.TrySetSerial(ref _disposable, _scheduler.Schedule<object?>(null, Run));
  175. }
  176. }
  177. private void Run(object? state, Action<object?> recurse)
  178. {
  179. T next;
  180. while (!_queue.TryDequeue(out next))
  181. {
  182. if (_failed)
  183. {
  184. // Between transitioning to _failed and the queue check in the loop,
  185. // items could have been queued, so we can't stop yet. We don't spin
  186. // and immediately re-check the queue.
  187. //
  188. // C: _queue.TryDequeue == false
  189. // ----------------------------------------------
  190. // P: OnNext(...)
  191. // _queue.Enqueue(...) // Will get lost
  192. // P: OnError(...)
  193. // _failed = true
  194. // ----------------------------------------------
  195. // C: if (_failed)
  196. // _observer.OnError(...) // Lost an OnNext
  197. //
  198. if (!_queue.IsEmpty)
  199. {
  200. continue;
  201. }
  202. Interlocked.Exchange(ref _state, Stopped);
  203. _observer.OnError(_error!);
  204. Dispose();
  205. return;
  206. }
  207. if (_completed)
  208. {
  209. // Between transitioning to _completed and the queue check in the loop,
  210. // items could have been queued, so we can't stop yet. We don't spin
  211. // and immediately re-check the queue.
  212. //
  213. // C: _queue.TryDequeue == false
  214. // ----------------------------------------------
  215. // P: OnNext(...)
  216. // _queue.Enqueue(...) // Will get lost
  217. // P: OnCompleted(...)
  218. // _completed = true
  219. // ----------------------------------------------
  220. // C: if (_completed)
  221. // _observer.OnCompleted() // Lost an OnNext
  222. //
  223. if (!_queue.IsEmpty)
  224. {
  225. continue;
  226. }
  227. Interlocked.Exchange(ref _state, Stopped);
  228. _observer.OnCompleted();
  229. Dispose();
  230. return;
  231. }
  232. var old = Interlocked.CompareExchange(ref _state, Stopped, Running);
  233. if (old == Running || old == Faulted)
  234. {
  235. return;
  236. }
  237. Debug.Assert(old == Pending);
  238. // The producer has put us in the PENDING state to prevent us from
  239. // transitioning to STOPPED, so we go RUNNING again and re-check our state.
  240. _state = Running;
  241. }
  242. Interlocked.Exchange(ref _state, Running);
  243. try
  244. {
  245. _observer.OnNext(next);
  246. }
  247. catch
  248. {
  249. Interlocked.Exchange(ref _state, Faulted);
  250. while (_queue.TryDequeue(out _))
  251. {
  252. }
  253. throw;
  254. }
  255. recurse(state);
  256. }
  257. protected override void OnNextCore(T value)
  258. {
  259. _queue.Enqueue(value);
  260. }
  261. protected override void OnErrorCore(Exception exception)
  262. {
  263. _error = exception;
  264. _failed = true;
  265. }
  266. protected override void OnCompletedCore()
  267. {
  268. _completed = true;
  269. }
  270. protected override void Dispose(bool disposing)
  271. {
  272. base.Dispose(disposing);
  273. if (disposing)
  274. {
  275. Disposable.TryDispose(ref _disposable);
  276. }
  277. }
  278. }
  279. internal sealed class ObserveOnObserver<T> : ScheduledObserver<T>
  280. {
  281. private IDisposable? _run;
  282. public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer)
  283. : base(scheduler, observer)
  284. {
  285. }
  286. public void Run(IObservable<T> source)
  287. {
  288. Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
  289. }
  290. protected override void OnNextCore(T value)
  291. {
  292. base.OnNextCore(value);
  293. EnsureActive();
  294. }
  295. protected override void OnErrorCore(Exception exception)
  296. {
  297. base.OnErrorCore(exception);
  298. EnsureActive();
  299. }
  300. protected override void OnCompletedCore()
  301. {
  302. base.OnCompletedCore();
  303. EnsureActive();
  304. }
  305. protected override void Dispose(bool disposing)
  306. {
  307. base.Dispose(disposing);
  308. if (disposing)
  309. {
  310. Disposable.TryDispose(ref _run);
  311. }
  312. }
  313. }
  314. internal interface IScheduledObserver<T> : IObserver<T>, IDisposable
  315. {
  316. void EnsureActive();
  317. void EnsureActive(int count);
  318. }
  319. /// <summary>
  320. /// An ObserveOn operator implementation that uses lock-free
  321. /// techniques to signal events to the downstream.
  322. /// </summary>
  323. /// <typeparam name="T">The element type of the sequence.</typeparam>
  324. internal sealed class ObserveOnObserverNew<T> : IdentitySink<T>
  325. {
  326. private readonly IScheduler _scheduler;
  327. private readonly ConcurrentQueue<T> _queue;
  328. /// <summary>
  329. /// The current task representing a running drain operation.
  330. /// </summary>
  331. private IDisposable? _task;
  332. /// <summary>
  333. /// Indicates the work-in-progress state of this operator,
  334. /// zero means no work is currently being done.
  335. /// </summary>
  336. private int _wip;
  337. /// <summary>
  338. /// If true, the upstream has issued OnCompleted.
  339. /// </summary>
  340. private bool _done;
  341. /// <summary>
  342. /// If <see cref="_done"/> is true and this is non-null, the upstream
  343. /// failed with an OnError.
  344. /// </summary>
  345. private Exception? _error;
  346. /// <summary>
  347. /// Indicates a dispose has been requested.
  348. /// </summary>
  349. private bool _disposed;
  350. public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream) : base(downstream)
  351. {
  352. _scheduler = scheduler;
  353. _queue = new ConcurrentQueue<T>();
  354. }
  355. protected override void Dispose(bool disposing)
  356. {
  357. Volatile.Write(ref _disposed, true);
  358. base.Dispose(disposing);
  359. if (disposing)
  360. {
  361. Disposable.TryDispose(ref _task);
  362. Clear(_queue);
  363. }
  364. }
  365. /// <summary>
  366. /// Remove remaining elements from the queue upon
  367. /// cancellation or failure.
  368. /// </summary>
  369. /// <param name="q">The queue to use. The argument ensures that the
  370. /// _queue field is not re-read from memory unnecessarily
  371. /// due to the memory barriers inside TryDequeue mandating it
  372. /// despite the field is read-only.</param>
  373. private void Clear(ConcurrentQueue<T> q)
  374. {
  375. while (q.TryDequeue(out var _))
  376. {
  377. }
  378. }
  379. public override void OnCompleted()
  380. {
  381. Volatile.Write(ref _done, true);
  382. Schedule();
  383. }
  384. public override void OnError(Exception error)
  385. {
  386. _error = error;
  387. Volatile.Write(ref _done, true);
  388. Schedule();
  389. }
  390. public override void OnNext(T value)
  391. {
  392. _queue.Enqueue(value);
  393. Schedule();
  394. }
  395. /// <summary>
  396. /// Submit the drain task via the appropriate scheduler if
  397. /// there is no drain currently running (wip > 0).
  398. /// </summary>
  399. private void Schedule()
  400. {
  401. if (Interlocked.Increment(ref _wip) == 1)
  402. {
  403. var newTask = new SingleAssignmentDisposable();
  404. if (Disposable.TrySetMultiple(ref _task, newTask))
  405. {
  406. newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
  407. }
  408. // If there was a cancellation, clear the queue
  409. // of items. This doesn't have to be inside the
  410. // wip != 0 (exclusive) mode as the queue
  411. // is of a multi-consumer type.
  412. if (Volatile.Read(ref _disposed))
  413. {
  414. Clear(_queue);
  415. }
  416. }
  417. }
  418. /// <summary>
  419. /// The static action to be scheduled on a simple scheduler.
  420. /// Avoids creating a delegate that captures <code>this</code>
  421. /// whenever the signals have to be drained.
  422. /// </summary>
  423. private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DrainShortRunningFunc =
  424. static (scheduler, self) => self.DrainShortRunning(scheduler);
  425. /// <summary>
  426. /// Emits at most one signal per run on a scheduler that doesn't like
  427. /// long running tasks.
  428. /// </summary>
  429. /// <param name="recursiveScheduler">The scheduler to use for scheduling the next signal emission if necessary.</param>
  430. /// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
  431. private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
  432. {
  433. DrainStep(_queue);
  434. if (Interlocked.Decrement(ref _wip) != 0)
  435. {
  436. // Don't return the disposable of Schedule() because that may chain together
  437. // a long string of ScheduledItems causing StackOverflowException upon Dispose()
  438. var d = recursiveScheduler.Schedule(this, DrainShortRunningFunc);
  439. Disposable.TrySetMultiple(ref _task, d);
  440. }
  441. return Disposable.Empty;
  442. }
  443. /// <summary>
  444. /// Executes a drain step by checking the disposed state,
  445. /// checking for the terminated state and for an
  446. /// empty queue, issuing the appropriate signals to the
  447. /// given downstream.
  448. /// </summary>
  449. /// <param name="q">The queue to use. The argument ensures that the
  450. /// _queue field is not re-read from memory due to the memory barriers
  451. /// inside TryDequeue mandating it despite the field is read-only.
  452. /// In addition, the DrainStep is invoked from the DrainLongRunning's loop
  453. /// so reading _queue inside this method would still incur the same barrier
  454. /// overhead otherwise.</param>
  455. private void DrainStep(ConcurrentQueue<T> q)
  456. {
  457. // Check if the operator has been disposed
  458. if (Volatile.Read(ref _disposed))
  459. {
  460. // cleanup residue items in the queue
  461. Clear(q);
  462. return;
  463. }
  464. // Has the upstream call OnCompleted?
  465. var d = Volatile.Read(ref _done);
  466. if (d)
  467. {
  468. // done = true happens before setting error
  469. // this is safe to be a plain read
  470. var ex = _error;
  471. // if not null, there was an OnError call
  472. if (ex != null)
  473. {
  474. Volatile.Write(ref _disposed, true);
  475. ForwardOnError(ex);
  476. return;
  477. }
  478. }
  479. // get the next item from the queue if any
  480. if (q.TryDequeue(out var v))
  481. {
  482. ForwardOnNext(v);
  483. return;
  484. }
  485. // the upstream called OnComplete and the queue is empty
  486. // that means we are done, no further signals can happen
  487. if (d)
  488. {
  489. Volatile.Write(ref _disposed, true);
  490. // otherwise, complete normally
  491. ForwardOnCompleted();
  492. }
  493. }
  494. }
  495. /// <summary>
  496. /// Signals events on a ISchedulerLongRunning by blocking the emission thread while waiting
  497. /// for them from the upstream.
  498. /// </summary>
  499. /// <typeparam name="TSource">The element type of the sequence.</typeparam>
  500. internal sealed class ObserveOnObserverLongRunning<TSource> : IdentitySink<TSource>
  501. {
  502. /// <summary>
  503. /// This will run a suspending drain task, hogging the backing thread
  504. /// until the sequence terminates or gets disposed.
  505. /// </summary>
  506. private readonly ISchedulerLongRunning _scheduler;
  507. /// <summary>
  508. /// The queue for holding the OnNext items, terminal signals have their own fields.
  509. /// </summary>
  510. private readonly ConcurrentQueue<TSource> _queue;
  511. /// <summary>
  512. /// Protects the suspension and resumption of the long running drain task.
  513. /// </summary>
  514. private readonly object _suspendGuard;
  515. /// <summary>
  516. /// The work-in-progress counter. If it jumps from 0 to 1, the drain task is resumed,
  517. /// if it reaches 0 again, the drain task is suspended.
  518. /// </summary>
  519. private long _wip;
  520. /// <summary>
  521. /// Set to true if the upstream terminated.
  522. /// </summary>
  523. private bool _done;
  524. /// <summary>
  525. /// Set to a non-null Exception if the upstream terminated with OnError.
  526. /// </summary>
  527. private Exception? _error;
  528. /// <summary>
  529. /// Indicates the sequence has been disposed and the drain task should quit.
  530. /// </summary>
  531. private bool _disposed;
  532. /// <summary>
  533. /// Makes sure the drain task is scheduled only once, when the first signal
  534. /// from upstream arrives.
  535. /// </summary>
  536. private int _runDrainOnce;
  537. /// <summary>
  538. /// The disposable tracking the drain task.
  539. /// </summary>
  540. private IDisposable? _drainTask;
  541. public ObserveOnObserverLongRunning(ISchedulerLongRunning scheduler, IObserver<TSource> observer) : base(observer)
  542. {
  543. _scheduler = scheduler;
  544. _queue = new ConcurrentQueue<TSource>();
  545. _suspendGuard = new object();
  546. }
  547. public override void OnCompleted()
  548. {
  549. Volatile.Write(ref _done, true);
  550. Schedule();
  551. }
  552. public override void OnError(Exception error)
  553. {
  554. _error = error;
  555. Volatile.Write(ref _done, true);
  556. Schedule();
  557. }
  558. public override void OnNext(TSource value)
  559. {
  560. _queue.Enqueue(value);
  561. Schedule();
  562. }
  563. private void Schedule()
  564. {
  565. // Schedule the suspending drain once
  566. if (Volatile.Read(ref _runDrainOnce) == 0
  567. && Interlocked.CompareExchange(ref _runDrainOnce, 1, 0) == 0)
  568. {
  569. Disposable.SetSingle(ref _drainTask, _scheduler.ScheduleLongRunning(this, DrainLongRunning));
  570. }
  571. // Indicate more work is to be done by the drain loop
  572. if (Interlocked.Increment(ref _wip) == 1L)
  573. {
  574. // resume the drain loop waiting on the guard
  575. lock (_suspendGuard)
  576. {
  577. Monitor.Pulse(_suspendGuard);
  578. }
  579. }
  580. }
  581. /// <summary>
  582. /// Static reference to the Drain method, saves allocation.
  583. /// </summary>
  584. private static readonly Action<ObserveOnObserverLongRunning<TSource>, ICancelable> DrainLongRunning = static (self, cancelable) => self.Drain();
  585. protected override void Dispose(bool disposing)
  586. {
  587. // Indicate the drain task should quit
  588. Volatile.Write(ref _disposed, true);
  589. // Resume the drain task in case it sleeps
  590. lock (_suspendGuard)
  591. {
  592. Monitor.Pulse(_suspendGuard);
  593. }
  594. // Cancel the drain task handle.
  595. Disposable.TryDispose(ref _drainTask);
  596. base.Dispose(disposing);
  597. }
  598. private void Drain()
  599. {
  600. var q = _queue;
  601. for (; ; )
  602. {
  603. // If the sequence was disposed, clear the queue and quit
  604. if (Volatile.Read(ref _disposed))
  605. {
  606. while (q.TryDequeue(out var _)) ;
  607. break;
  608. }
  609. // Has the upstream terminated?
  610. var isDone = Volatile.Read(ref _done);
  611. // Do we have an item in the queue
  612. var hasValue = q.TryDequeue(out var item);
  613. // If the upstream has terminated and no further items are in the queue
  614. if (isDone && !hasValue)
  615. {
  616. // Find out if the upstream terminated with an error and signal accordingly.
  617. var e = _error;
  618. if (e != null)
  619. {
  620. ForwardOnError(e);
  621. }
  622. else
  623. {
  624. ForwardOnCompleted();
  625. }
  626. break;
  627. }
  628. // There was an item, signal it.
  629. if (hasValue)
  630. {
  631. ForwardOnNext(item!);
  632. // Consume the item and try the next item if the work-in-progress
  633. // indicator is still not zero
  634. if (Interlocked.Decrement(ref _wip) != 0L)
  635. {
  636. continue;
  637. }
  638. }
  639. // If we run out of work and the sequence is not disposed
  640. if (Volatile.Read(ref _wip) == 0L && !Volatile.Read(ref _disposed))
  641. {
  642. var g = _suspendGuard;
  643. // try sleeping, if we can't even enter the lock, the producer
  644. // side is currently trying to resume us
  645. if (Monitor.TryEnter(g))
  646. {
  647. // Make sure again there is still no work and the sequence is not disposed
  648. if (Volatile.Read(ref _wip) == 0L && !Volatile.Read(ref _disposed))
  649. {
  650. // wait for a Pulse(g)
  651. Monitor.Wait(g);
  652. }
  653. // Unlock
  654. Monitor.Exit(g);
  655. }
  656. }
  657. }
  658. }
  659. }
  660. }