ScheduledObserver.cs 22 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive
  8. {
  9. using Collections.Concurrent;
  10. using Diagnostics;
  11. internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
  12. {
  13. private int _state;
  14. private const int STOPPED = 0;
  15. private const int RUNNING = 1;
  16. private const int PENDING = 2;
  17. private const int FAULTED = 9;
  18. private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  19. private bool _failed;
  20. private Exception _error;
  21. private bool _completed;
  22. private readonly IObserver<T> _observer;
  23. private readonly IScheduler _scheduler;
  24. private readonly ISchedulerLongRunning _longRunning;
  25. private IDisposable _disposable;
  26. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  27. {
  28. _scheduler = scheduler;
  29. _observer = observer;
  30. _longRunning = _scheduler.AsLongRunning();
  31. if (_longRunning != null)
  32. {
  33. _dispatcherEvent = new SemaphoreSlim(0);
  34. _dispatcherEventRelease = new SemaphoreSlimRelease(_dispatcherEvent);
  35. }
  36. }
  37. private sealed class SemaphoreSlimRelease : IDisposable
  38. {
  39. private SemaphoreSlim _dispatcherEvent;
  40. public SemaphoreSlimRelease(SemaphoreSlim dispatcherEvent)
  41. {
  42. Volatile.Write(ref _dispatcherEvent, dispatcherEvent);
  43. }
  44. public void Dispose()
  45. {
  46. Interlocked.Exchange(ref _dispatcherEvent, null)?.Release();
  47. }
  48. }
  49. private readonly object _dispatcherInitGate = new object();
  50. private readonly SemaphoreSlim _dispatcherEvent;
  51. private readonly IDisposable _dispatcherEventRelease;
  52. private IDisposable _dispatcherJob;
  53. private void EnsureDispatcher()
  54. {
  55. if (_dispatcherJob == null)
  56. {
  57. lock (_dispatcherInitGate)
  58. {
  59. if (_dispatcherJob == null)
  60. {
  61. _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
  62. Disposable.TrySetSerial(ref _disposable, StableCompositeDisposable.Create
  63. (
  64. _dispatcherJob,
  65. _dispatcherEventRelease
  66. ));
  67. }
  68. }
  69. }
  70. }
  71. private void Dispatch(ICancelable cancel)
  72. {
  73. while (true)
  74. {
  75. _dispatcherEvent.Wait();
  76. if (cancel.IsDisposed)
  77. {
  78. return;
  79. }
  80. var next = default(T);
  81. while (_queue.TryDequeue(out next))
  82. {
  83. try
  84. {
  85. _observer.OnNext(next);
  86. }
  87. catch
  88. {
  89. while (_queue.TryDequeue(out _))
  90. {
  91. }
  92. throw;
  93. }
  94. _dispatcherEvent.Wait();
  95. if (cancel.IsDisposed)
  96. {
  97. return;
  98. }
  99. }
  100. if (_failed)
  101. {
  102. _observer.OnError(_error);
  103. Dispose();
  104. return;
  105. }
  106. if (_completed)
  107. {
  108. _observer.OnCompleted();
  109. Dispose();
  110. return;
  111. }
  112. }
  113. }
  114. public void EnsureActive() => EnsureActive(1);
  115. public void EnsureActive(int n)
  116. {
  117. if (_longRunning != null)
  118. {
  119. if (n > 0)
  120. {
  121. _dispatcherEvent.Release(n);
  122. }
  123. EnsureDispatcher();
  124. }
  125. else
  126. {
  127. EnsureActiveSlow();
  128. }
  129. }
  130. private void EnsureActiveSlow()
  131. {
  132. var isOwner = false;
  133. while (true)
  134. {
  135. var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
  136. if (old == STOPPED)
  137. {
  138. isOwner = true; // RUNNING
  139. break;
  140. }
  141. if (old == FAULTED)
  142. {
  143. return;
  144. }
  145. // If we find the consumer loop running, we transition to PENDING to handle
  146. // the case where the queue is seen empty by the consumer, making it transition
  147. // to the STOPPED state, but we inserted an item into the queue.
  148. //
  149. // C: _queue.TryDequeue == false (RUNNING)
  150. // ----------------------------------------------
  151. // P: _queue.Enqueue(...)
  152. // EnsureActive
  153. // Exchange(ref _state, RUNNING) == RUNNING
  154. // ----------------------------------------------
  155. // C: transition to STOPPED (STOPPED)
  156. //
  157. // In this case, P would believe C is running and not invoke the scheduler
  158. // using the isOwner flag.
  159. //
  160. // By introducing an intermediate PENDING state and using CAS in the consumer
  161. // to only transition to STOPPED in case we were still RUNNING, we can force
  162. // the consumer to reconsider the decision to transition to STOPPED. In that
  163. // case, the consumer loops again and re-reads from the queue and other state
  164. // fields. At least one bit of state will have changed because EnsureActive
  165. // should only be called after invocation of IObserver<T> methods that touch
  166. // this state.
  167. //
  168. if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
  169. {
  170. break;
  171. }
  172. }
  173. if (isOwner)
  174. {
  175. Disposable.TrySetSerial(ref _disposable, _scheduler.Schedule<object>(null, Run));
  176. }
  177. }
  178. private void Run(object state, Action<object> recurse)
  179. {
  180. var next = default(T);
  181. while (!_queue.TryDequeue(out next))
  182. {
  183. if (_failed)
  184. {
  185. // Between transitioning to _failed and the queue check in the loop,
  186. // items could have been queued, so we can't stop yet. We don't spin
  187. // and immediately re-check the queue.
  188. //
  189. // C: _queue.TryDequeue == false
  190. // ----------------------------------------------
  191. // P: OnNext(...)
  192. // _queue.Enqueue(...) // Will get lost
  193. // P: OnError(...)
  194. // _failed = true
  195. // ----------------------------------------------
  196. // C: if (_failed)
  197. // _observer.OnError(...) // Lost an OnNext
  198. //
  199. if (!_queue.IsEmpty)
  200. {
  201. continue;
  202. }
  203. Interlocked.Exchange(ref _state, STOPPED);
  204. _observer.OnError(_error);
  205. Dispose();
  206. return;
  207. }
  208. if (_completed)
  209. {
  210. // Between transitioning to _completed and the queue check in the loop,
  211. // items could have been queued, so we can't stop yet. We don't spin
  212. // and immediately re-check the queue.
  213. //
  214. // C: _queue.TryDequeue == false
  215. // ----------------------------------------------
  216. // P: OnNext(...)
  217. // _queue.Enqueue(...) // Will get lost
  218. // P: OnCompleted(...)
  219. // _completed = true
  220. // ----------------------------------------------
  221. // C: if (_completed)
  222. // _observer.OnCompleted() // Lost an OnNext
  223. //
  224. if (!_queue.IsEmpty)
  225. {
  226. continue;
  227. }
  228. Interlocked.Exchange(ref _state, STOPPED);
  229. _observer.OnCompleted();
  230. Dispose();
  231. return;
  232. }
  233. var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
  234. if (old == RUNNING || old == FAULTED)
  235. {
  236. return;
  237. }
  238. Debug.Assert(old == PENDING);
  239. // The producer has put us in the PENDING state to prevent us from
  240. // transitioning to STOPPED, so we go RUNNING again and re-check our state.
  241. _state = RUNNING;
  242. }
  243. Interlocked.Exchange(ref _state, RUNNING);
  244. try
  245. {
  246. _observer.OnNext(next);
  247. }
  248. catch
  249. {
  250. Interlocked.Exchange(ref _state, FAULTED);
  251. while (_queue.TryDequeue(out _))
  252. {
  253. }
  254. throw;
  255. }
  256. recurse(state);
  257. }
  258. protected override void OnNextCore(T value)
  259. {
  260. _queue.Enqueue(value);
  261. }
  262. protected override void OnErrorCore(Exception exception)
  263. {
  264. _error = exception;
  265. _failed = true;
  266. }
  267. protected override void OnCompletedCore()
  268. {
  269. _completed = true;
  270. }
  271. protected override void Dispose(bool disposing)
  272. {
  273. base.Dispose(disposing);
  274. if (disposing)
  275. {
  276. Disposable.TryDispose(ref _disposable);
  277. }
  278. }
  279. }
  280. internal sealed class ObserveOnObserver<T> : ScheduledObserver<T>
  281. {
  282. private IDisposable _run;
  283. public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer)
  284. : base(scheduler, observer)
  285. {
  286. }
  287. public void Run(IObservable<T> source)
  288. {
  289. Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
  290. }
  291. protected override void OnNextCore(T value)
  292. {
  293. base.OnNextCore(value);
  294. EnsureActive();
  295. }
  296. protected override void OnErrorCore(Exception exception)
  297. {
  298. base.OnErrorCore(exception);
  299. EnsureActive();
  300. }
  301. protected override void OnCompletedCore()
  302. {
  303. base.OnCompletedCore();
  304. EnsureActive();
  305. }
  306. protected override void Dispose(bool disposing)
  307. {
  308. base.Dispose(disposing);
  309. if (disposing)
  310. {
  311. Disposable.TryDispose(ref _run);
  312. }
  313. }
  314. }
  315. internal interface IScheduledObserver<T> : IObserver<T>, IDisposable
  316. {
  317. void EnsureActive();
  318. void EnsureActive(int count);
  319. }
  320. /// <summary>
  321. /// An ObserveOn operator implementation that uses lock-free
  322. /// techniques to signal events to the downstream.
  323. /// </summary>
  324. /// <typeparam name="T">The element type of the sequence.</typeparam>
  325. internal sealed class ObserveOnObserverNew<T> : IdentitySink<T>
  326. {
  327. private readonly IScheduler _scheduler;
  328. /// <summary>
  329. /// If not null, the <see cref="_scheduler"/> supports
  330. /// long running tasks.
  331. /// </summary>
  332. private readonly ISchedulerLongRunning _longRunning;
  333. private readonly ConcurrentQueue<T> _queue;
  334. /// <summary>
  335. /// The current task representing a running drain operation.
  336. /// </summary>
  337. private IDisposable _task;
  338. /// <summary>
  339. /// Indicates the work-in-progress state of this operator,
  340. /// zero means no work is currently being done.
  341. /// </summary>
  342. private int _wip;
  343. /// <summary>
  344. /// If true, the upstream has issued OnCompleted.
  345. /// </summary>
  346. private bool _done;
  347. /// <summary>
  348. /// If <see cref="_done"/> is true and this is non-null, the upstream
  349. /// failed with an OnError.
  350. /// </summary>
  351. private Exception _error;
  352. /// <summary>
  353. /// Indicates a dispose has been requested.
  354. /// </summary>
  355. private bool _disposed;
  356. public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream) : base(downstream)
  357. {
  358. _scheduler = scheduler;
  359. _longRunning = scheduler.AsLongRunning();
  360. _queue = new ConcurrentQueue<T>();
  361. }
  362. protected override void Dispose(bool disposing)
  363. {
  364. Volatile.Write(ref _disposed, true);
  365. base.Dispose(disposing);
  366. if (disposing)
  367. {
  368. Disposable.TryDispose(ref _task);
  369. Clear(_queue);
  370. }
  371. }
  372. /// <summary>
  373. /// Remove remaining elements from the queue upon
  374. /// cancellation or failure.
  375. /// </summary>
  376. /// <param name="q">The queue to use. The argument ensures that the
  377. /// _queue field is not re-read from memory unnecessarily
  378. /// due to the memory barriers inside TryDequeue mandating it
  379. /// despite the field is read-only.</param>
  380. private void Clear(ConcurrentQueue<T> q)
  381. {
  382. while (q.TryDequeue(out var _))
  383. {
  384. ;
  385. }
  386. }
  387. public override void OnCompleted()
  388. {
  389. Volatile.Write(ref _done, true);
  390. Schedule();
  391. }
  392. public override void OnError(Exception error)
  393. {
  394. _error = error;
  395. Volatile.Write(ref _done, true);
  396. Schedule();
  397. }
  398. public override void OnNext(T value)
  399. {
  400. _queue.Enqueue(value);
  401. Schedule();
  402. }
  403. /// <summary>
  404. /// Submit the drain task via the appropriate scheduler if
  405. /// there is no drain currently running (wip > 0).
  406. /// </summary>
  407. private void Schedule()
  408. {
  409. if (Interlocked.Increment(ref _wip) == 1)
  410. {
  411. var newTask = new SingleAssignmentDisposable();
  412. if (Disposable.TrySetMultiple(ref _task, newTask))
  413. {
  414. var longRunning = _longRunning;
  415. if (longRunning != null)
  416. {
  417. newTask.Disposable = longRunning.ScheduleLongRunning(this, DrainLongRunningAction);
  418. }
  419. else
  420. {
  421. newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
  422. }
  423. }
  424. // If there was a cancellation, clear the queue
  425. // of items. This doesn't have to be inside the
  426. // wip != 0 (exclusive) mode as the queue
  427. // is of a multi-consumer type.
  428. if (Volatile.Read(ref _disposed))
  429. {
  430. Clear(_queue);
  431. }
  432. }
  433. }
  434. /// <summary>
  435. /// The static action to be scheduled on a long running scheduler.
  436. /// Avoids creating a delegate that captures <code>this</code>
  437. /// whenever the signals have to be drained.
  438. /// </summary>
  439. private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DrainLongRunningAction =
  440. (self, cancel) => self.DrainLongRunning();
  441. /// <summary>
  442. /// The static action to be scheduled on a simple scheduler.
  443. /// Avoids creating a delegate that captures <code>this</code>
  444. /// whenever the signals have to be drained.
  445. /// </summary>
  446. private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DrainShortRunningFunc =
  447. (scheduler, self) => self.DrainShortRunning(scheduler);
  448. /// <summary>
  449. /// Emits at most one signal per run on a scheduler that doesn't like
  450. /// long running tasks.
  451. /// </summary>
  452. /// <param name="recursiveScheduler">The scheduler to use for scheduling the next signal emission if necessary.</param>
  453. /// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
  454. private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
  455. {
  456. DrainStep(_queue, false);
  457. if (Interlocked.Decrement(ref _wip) != 0)
  458. {
  459. // Don't return the disposable of Schedule() because that may chain together
  460. // a long string of ScheduledItems causing StackOverflowException upon Dispose()
  461. var d = recursiveScheduler.Schedule(this, DrainShortRunningFunc);
  462. Disposable.TrySetMultiple(ref _task, d);
  463. }
  464. return Disposable.Empty;
  465. }
  466. /// <summary>
  467. /// Executes a drain step by checking the disposed state,
  468. /// checking for the terminated state and for an
  469. /// empty queue, issuing the appropriate signals to the
  470. /// given downstream.
  471. /// </summary>
  472. /// <param name="q">The queue to use. The argument ensures that the
  473. /// _queue field is not re-read from memory due to the memory barriers
  474. /// inside TryDequeue mandating it despite the field is read-only.
  475. /// In addition, the DrainStep is invoked from the DrainLongRunning's loop
  476. /// so reading _queue inside this method would still incur the same barrier
  477. /// overhead otherwise.</param>
  478. /// <param name="delayError">Should the errors be delayed until all
  479. /// queued items have been emitted to the downstream?</param>
  480. /// <returns>True if the drain loop should stop.</returns>
  481. private bool DrainStep(ConcurrentQueue<T> q, bool delayError)
  482. {
  483. // Check if the operator has been disposed
  484. if (Volatile.Read(ref _disposed))
  485. {
  486. // cleanup residue items in the queue
  487. Clear(q);
  488. return true;
  489. }
  490. // Has the upstream call OnCompleted?
  491. var d = Volatile.Read(ref _done);
  492. if (d && !delayError)
  493. {
  494. // done = true happens before setting error
  495. // this is safe to be a plain read
  496. var ex = _error;
  497. // if not null, there was an OnError call
  498. if (ex != null)
  499. {
  500. Volatile.Write(ref _disposed, true);
  501. ForwardOnError(ex);
  502. return true;
  503. }
  504. }
  505. // get the next item from the queue if any
  506. var empty = !q.TryDequeue(out var v);
  507. // the upstream called OnComplete and the queue is empty
  508. // that means we are done, no further signals can happen
  509. if (d && empty)
  510. {
  511. Volatile.Write(ref _disposed, true);
  512. // done = true happens before setting error
  513. // this is safe to be a plain read
  514. var ex = _error;
  515. // if not null, there was an OnError call
  516. if (ex != null)
  517. {
  518. ForwardOnError(ex);
  519. }
  520. else
  521. {
  522. // otherwise, complete normally
  523. ForwardOnCompleted();
  524. }
  525. return true;
  526. }
  527. // the queue is empty and the upstream hasn't completed yet
  528. if (empty)
  529. {
  530. return true;
  531. }
  532. // emit the item
  533. ForwardOnNext(v);
  534. // keep looping
  535. return false;
  536. }
  537. /// <summary>
  538. /// Emits as many signals as possible to the downstream observer
  539. /// as this is executing a long-running scheduler so
  540. /// it can occupy that thread as long as it needs to.
  541. /// </summary>
  542. private void DrainLongRunning()
  543. {
  544. var missed = 1;
  545. // read out fields upfront as the DrainStep uses atomics
  546. // that would force the re-read of these constant values
  547. // from memory, regardless of readonly, afaik
  548. var q = _queue;
  549. for (; ; )
  550. {
  551. for (; ; )
  552. {
  553. // delayError: true - because of
  554. // ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
  555. // expects something that almost looks like full delayError
  556. if (DrainStep(q, true))
  557. {
  558. break;
  559. }
  560. }
  561. missed = Interlocked.Add(ref _wip, -missed);
  562. if (missed == 0)
  563. {
  564. break;
  565. }
  566. }
  567. }
  568. }
  569. }