ScheduledObserver.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive
  9. {
  10. using System.Collections.Concurrent;
  11. using System.Diagnostics;
  12. internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
  13. {
  14. private volatile int _state = 0;
  15. private const int STOPPED = 0;
  16. private const int RUNNING = 1;
  17. private const int PENDING = 2;
  18. private const int FAULTED = 9;
  19. private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  20. private volatile bool _failed;
  21. private volatile Exception _error;
  22. private volatile bool _completed;
  23. private readonly IObserver<T> _observer;
  24. private readonly IScheduler _scheduler;
  25. private readonly ISchedulerLongRunning _longRunning;
  26. private readonly SerialDisposable _disposable = new SerialDisposable();
  27. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  28. {
  29. _scheduler = scheduler;
  30. _observer = observer;
  31. _longRunning = _scheduler.AsLongRunning();
  32. if (_longRunning != null)
  33. {
  34. _dispatcherEvent = new SemaphoreSlim(0);
  35. _dispatcherEventRelease = Disposable.Create(() => _dispatcherEvent.Release());
  36. }
  37. }
  38. private readonly object _dispatcherInitGate = new object();
  39. private readonly SemaphoreSlim _dispatcherEvent;
  40. private readonly IDisposable _dispatcherEventRelease;
  41. private IDisposable _dispatcherJob;
  42. private void EnsureDispatcher()
  43. {
  44. if (_dispatcherJob == null)
  45. {
  46. lock (_dispatcherInitGate)
  47. {
  48. if (_dispatcherJob == null)
  49. {
  50. _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
  51. _disposable.Disposable = StableCompositeDisposable.Create
  52. (
  53. _dispatcherJob,
  54. _dispatcherEventRelease
  55. );
  56. }
  57. }
  58. }
  59. }
  60. private void Dispatch(ICancelable cancel)
  61. {
  62. while (true)
  63. {
  64. _dispatcherEvent.Wait();
  65. if (cancel.IsDisposed)
  66. return;
  67. var next = default(T);
  68. while (_queue.TryDequeue(out next))
  69. {
  70. try
  71. {
  72. _observer.OnNext(next);
  73. }
  74. catch
  75. {
  76. var nop = default(T);
  77. while (_queue.TryDequeue(out nop))
  78. ;
  79. throw;
  80. }
  81. _dispatcherEvent.Wait();
  82. if (cancel.IsDisposed)
  83. return;
  84. }
  85. if (_failed)
  86. {
  87. _observer.OnError(_error);
  88. Dispose();
  89. return;
  90. }
  91. if (_completed)
  92. {
  93. _observer.OnCompleted();
  94. Dispose();
  95. return;
  96. }
  97. }
  98. }
  99. public void EnsureActive() => EnsureActive(1);
  100. public void EnsureActive(int n)
  101. {
  102. if (_longRunning != null)
  103. {
  104. if (n > 0)
  105. {
  106. _dispatcherEvent.Release(n);
  107. }
  108. EnsureDispatcher();
  109. }
  110. else
  111. EnsureActiveSlow();
  112. }
  113. private void EnsureActiveSlow()
  114. {
  115. var isOwner = false;
  116. while (true)
  117. {
  118. var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
  119. if (old == STOPPED)
  120. {
  121. isOwner = true; // RUNNING
  122. break;
  123. }
  124. if (old == FAULTED)
  125. return;
  126. // If we find the consumer loop running, we transition to PENDING to handle
  127. // the case where the queue is seen empty by the consumer, making it transition
  128. // to the STOPPED state, but we inserted an item into the queue.
  129. //
  130. // C: _queue.TryDequeue == false (RUNNING)
  131. // ----------------------------------------------
  132. // P: _queue.Enqueue(...)
  133. // EnsureActive
  134. // Exchange(ref _state, RUNNING) == RUNNING
  135. // ----------------------------------------------
  136. // C: transition to STOPPED (STOPPED)
  137. //
  138. // In this case, P would believe C is running and not invoke the scheduler
  139. // using the isOwner flag.
  140. //
  141. // By introducing an intermediate PENDING state and using CAS in the consumer
  142. // to only transition to STOPPED in case we were still RUNNING, we can force
  143. // the consumer to reconsider the decision to transition to STOPPED. In that
  144. // case, the consumer loops again and re-reads from the queue and other state
  145. // fields. At least one bit of state will have changed because EnsureActive
  146. // should only be called after invocation of IObserver<T> methods that touch
  147. // this state.
  148. //
  149. if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
  150. break;
  151. }
  152. if (isOwner)
  153. {
  154. _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
  155. }
  156. }
  157. private void Run(object state, Action<object> recurse)
  158. {
  159. var next = default(T);
  160. while (!_queue.TryDequeue(out next))
  161. {
  162. if (_failed)
  163. {
  164. // Between transitioning to _failed and the queue check in the loop,
  165. // items could have been queued, so we can't stop yet. We don't spin
  166. // and immediately re-check the queue.
  167. //
  168. // C: _queue.TryDequeue == false
  169. // ----------------------------------------------
  170. // P: OnNext(...)
  171. // _queue.Enqueue(...) // Will get lost
  172. // P: OnError(...)
  173. // _failed = true
  174. // ----------------------------------------------
  175. // C: if (_failed)
  176. // _observer.OnError(...) // Lost an OnNext
  177. //
  178. if (!_queue.IsEmpty)
  179. continue;
  180. Interlocked.Exchange(ref _state, STOPPED);
  181. _observer.OnError(_error);
  182. Dispose();
  183. return;
  184. }
  185. if (_completed)
  186. {
  187. // Between transitioning to _completed and the queue check in the loop,
  188. // items could have been queued, so we can't stop yet. We don't spin
  189. // and immediately re-check the queue.
  190. //
  191. // C: _queue.TryDequeue == false
  192. // ----------------------------------------------
  193. // P: OnNext(...)
  194. // _queue.Enqueue(...) // Will get lost
  195. // P: OnCompleted(...)
  196. // _completed = true
  197. // ----------------------------------------------
  198. // C: if (_completed)
  199. // _observer.OnCompleted() // Lost an OnNext
  200. //
  201. if (!_queue.IsEmpty)
  202. continue;
  203. Interlocked.Exchange(ref _state, STOPPED);
  204. _observer.OnCompleted();
  205. Dispose();
  206. return;
  207. }
  208. var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
  209. if (old == RUNNING || old == FAULTED)
  210. return;
  211. Debug.Assert(old == PENDING);
  212. // The producer has put us in the PENDING state to prevent us from
  213. // transitioning to STOPPED, so we go RUNNING again and re-check our state.
  214. _state = RUNNING;
  215. }
  216. Interlocked.Exchange(ref _state, RUNNING);
  217. try
  218. {
  219. _observer.OnNext(next);
  220. }
  221. catch
  222. {
  223. Interlocked.Exchange(ref _state, FAULTED);
  224. var nop = default(T);
  225. while (_queue.TryDequeue(out nop))
  226. ;
  227. throw;
  228. }
  229. recurse(state);
  230. }
  231. protected override void OnNextCore(T value)
  232. {
  233. _queue.Enqueue(value);
  234. }
  235. protected override void OnErrorCore(Exception exception)
  236. {
  237. _error = exception;
  238. _failed = true;
  239. }
  240. protected override void OnCompletedCore()
  241. {
  242. _completed = true;
  243. }
  244. protected override void Dispose(bool disposing)
  245. {
  246. base.Dispose(disposing);
  247. if (disposing)
  248. {
  249. _disposable.Dispose();
  250. }
  251. }
  252. }
  253. internal sealed class ObserveOnObserver<T> : ScheduledObserver<T>
  254. {
  255. private IDisposable _run;
  256. public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer)
  257. : base(scheduler, observer)
  258. {
  259. }
  260. public void Run(IObservable<T> source)
  261. {
  262. Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
  263. }
  264. protected override void OnNextCore(T value)
  265. {
  266. base.OnNextCore(value);
  267. EnsureActive();
  268. }
  269. protected override void OnErrorCore(Exception exception)
  270. {
  271. base.OnErrorCore(exception);
  272. EnsureActive();
  273. }
  274. protected override void OnCompletedCore()
  275. {
  276. base.OnCompletedCore();
  277. EnsureActive();
  278. }
  279. protected override void Dispose(bool disposing)
  280. {
  281. base.Dispose(disposing);
  282. if (disposing)
  283. {
  284. Disposable.TryDispose(ref _run);
  285. }
  286. }
  287. }
  288. internal interface IScheduledObserver<T> : IObserver<T>, IDisposable
  289. {
  290. void EnsureActive();
  291. void EnsureActive(int count);
  292. }
  293. /// <summary>
  294. /// An ObserveOn operator implementation that uses lock-free
  295. /// techniques to signal events to the downstream.
  296. /// </summary>
  297. /// <typeparam name="T">The element type of the sequence.</typeparam>
  298. internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
  299. {
  300. readonly IObserver<T> downstream;
  301. readonly IScheduler scheduler;
  302. /// <summary>
  303. /// If not null, the <see cref="scheduler"/> supports
  304. /// long running tasks.
  305. /// </summary>
  306. readonly ISchedulerLongRunning longRunning;
  307. readonly ConcurrentQueue<T> queue;
  308. private IDisposable _run;
  309. /// <summary>
  310. /// The current task representing a running drain operation.
  311. /// </summary>
  312. IDisposable task;
  313. /// <summary>
  314. /// Indicates the work-in-progress state of this operator,
  315. /// zero means no work is currently being done.
  316. /// </summary>
  317. int wip;
  318. /// <summary>
  319. /// If true, the upstream has issued OnCompleted.
  320. /// </summary>
  321. bool done;
  322. /// <summary>
  323. /// If <see cref="done"/> is true and this is non-null, the upstream
  324. /// failed with an OnError.
  325. /// </summary>
  326. Exception error;
  327. /// <summary>
  328. /// Indicates a dispose has been requested.
  329. /// </summary>
  330. bool disposed;
  331. public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream)
  332. {
  333. this.downstream = downstream;
  334. this.scheduler = scheduler;
  335. this.longRunning = scheduler.AsLongRunning();
  336. this.queue = new ConcurrentQueue<T>();
  337. }
  338. public void Run(IObservable<T> source)
  339. {
  340. Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
  341. }
  342. public void Dispose()
  343. {
  344. Volatile.Write(ref disposed, true);
  345. Disposable.TryDispose(ref task);
  346. Disposable.TryDispose(ref _run);
  347. Clear();
  348. }
  349. /// <summary>
  350. /// Remove remaining elements from the queue upon
  351. /// cancellation or failure.
  352. /// </summary>
  353. void Clear()
  354. {
  355. var q = queue;
  356. while (q.TryDequeue(out var _)) ;
  357. }
  358. public void OnCompleted()
  359. {
  360. Volatile.Write(ref done, true);
  361. Schedule();
  362. }
  363. public void OnError(Exception error)
  364. {
  365. this.error = error;
  366. Volatile.Write(ref done, true);
  367. Schedule();
  368. }
  369. public void OnNext(T value)
  370. {
  371. queue.Enqueue(value);
  372. Schedule();
  373. }
  374. /// <summary>
  375. /// Submit the drain task via the appropriate scheduler if
  376. /// there is no drain currently running (wip > 0).
  377. /// </summary>
  378. void Schedule()
  379. {
  380. if (Interlocked.Increment(ref wip) == 1)
  381. {
  382. var newTask = new SingleAssignmentDisposable();
  383. if (Disposable.TrySetMultiple(ref task, newTask))
  384. {
  385. var longRunning = this.longRunning;
  386. if (longRunning != null)
  387. {
  388. newTask.Disposable = longRunning.ScheduleLongRunning(this, DRAIN_LONG_RUNNING);
  389. }
  390. else
  391. {
  392. newTask.Disposable = scheduler.Schedule(this, DRAIN_SHORT_RUNNING);
  393. }
  394. }
  395. // If there was a cancellation, clear the queue
  396. // of items. This doesn't have to be inside the
  397. // wip != 0 (exclusive) mode as the queue
  398. // is of a multi-consumer type.
  399. if (Volatile.Read(ref disposed))
  400. {
  401. Clear();
  402. }
  403. }
  404. }
  405. /// <summary>
  406. /// The static action to be scheduled on a long running scheduler.
  407. /// Avoids creating a delegate that captures <code>this</code>
  408. /// whenever the signals have to be drained.
  409. /// </summary>
  410. static readonly Action<ObserveOnObserverNew<T>, ICancelable> DRAIN_LONG_RUNNING =
  411. (self, cancel) => self.DrainLongRunning();
  412. /// <summary>
  413. /// The static action to be scheduled on a simple scheduler.
  414. /// Avoids creating a delegate that captures <code>this</code>
  415. /// whenever the signals have to be drained.
  416. /// </summary>
  417. static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DRAIN_SHORT_RUNNING =
  418. (scheduler, self) => self.DrainShortRunning(scheduler);
  419. /// <summary>
  420. /// Emits at most one signal per run on a scheduler that doesn't like
  421. /// long running tasks.
  422. /// </summary>
  423. /// <param name="recursiveScheduler">The scheduler to use for scheduling the next signal emission if necessary.</param>
  424. /// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
  425. IDisposable DrainShortRunning(IScheduler recursiveScheduler)
  426. {
  427. DrainStep(queue, downstream, false);
  428. if (Interlocked.Decrement(ref wip) != 0)
  429. {
  430. return recursiveScheduler.Schedule(this, DRAIN_SHORT_RUNNING);
  431. }
  432. return Disposable.Empty;
  433. }
  434. /// <summary>
  435. /// Executes a drain step by checking the disposed state,
  436. /// checking for the terminated state and for an
  437. /// empty queue, issuing the appropriate signals to the
  438. /// given downstream.
  439. /// </summary>
  440. /// <param name="q">The queue to use.</param>
  441. /// <param name="downstream">The intended consumer of the events.</param>
  442. /// <param name="delayError">Should the errors be delayed until all
  443. /// queued items have been emitted to the downstream?</param>
  444. /// <returns>True if the drain loop should stop.</returns>
  445. bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delayError)
  446. {
  447. // Check if the operator has been disposed
  448. if (Volatile.Read(ref disposed))
  449. {
  450. // cleanup residue items in the queue
  451. Clear();
  452. return true;
  453. }
  454. // Has the upstream call OnCompleted?
  455. var d = Volatile.Read(ref done);
  456. if (d && !delayError)
  457. {
  458. // done = true happens before setting error
  459. // this is safe to be a plain read
  460. var ex = error;
  461. // if not null, there was an OnError call
  462. if (ex != null)
  463. {
  464. Volatile.Write(ref disposed, true);
  465. downstream.OnError(ex);
  466. return true;
  467. }
  468. }
  469. // get the next item from the queue if any
  470. var empty = !queue.TryDequeue(out var v);
  471. // the upstream called OnComplete and the queue is empty
  472. // that means we are done, no further signals can happen
  473. if (d && empty)
  474. {
  475. Volatile.Write(ref disposed, true);
  476. // done = true happens before setting error
  477. // this is safe to be a plain read
  478. var ex = error;
  479. // if not null, there was an OnError call
  480. if (ex != null)
  481. {
  482. downstream.OnError(ex);
  483. }
  484. else
  485. {
  486. // otherwise, complete normally
  487. downstream.OnCompleted();
  488. }
  489. return true;
  490. }
  491. else
  492. // the queue is empty and the upstream hasn't completed yet
  493. if (empty)
  494. {
  495. return true;
  496. }
  497. // emit the item
  498. downstream.OnNext(v);
  499. // keep looping
  500. return false;
  501. }
  502. /// <summary>
  503. /// Emits as many signals as possible to the downstream observer
  504. /// as this is executing a long-running scheduler so
  505. /// it can occupy that thread as long as it needs to.
  506. /// </summary>
  507. void DrainLongRunning()
  508. {
  509. var missed = 1;
  510. // read out fields upfront as the DrainStep uses atomics
  511. // that would force the re-read of these constant values
  512. // from memory, regardless of readonly, afaik
  513. var q = queue;
  514. var downstream = this.downstream;
  515. for (; ; )
  516. {
  517. for (; ; )
  518. {
  519. // delayError: true - because of
  520. // ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
  521. // expects something that almost looks like full delayError
  522. if (DrainStep(q, downstream, true))
  523. {
  524. break;
  525. }
  526. }
  527. missed = Interlocked.Add(ref wip, -missed);
  528. if (missed == 0)
  529. {
  530. break;
  531. }
  532. }
  533. }
  534. }
  535. }