ScheduledObserver.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive
  9. {
  10. #if !NO_PERF && !NO_CDS
  11. using System.Collections.Concurrent;
  12. using System.Diagnostics;
  13. internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
  14. {
  15. private volatile int _state = 0;
  16. private const int STOPPED = 0;
  17. private const int RUNNING = 1;
  18. private const int PENDING = 2;
  19. private const int FAULTED = 9;
  20. private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  21. private volatile bool _failed;
  22. private volatile Exception _error;
  23. private volatile bool _completed;
  24. private readonly IObserver<T> _observer;
  25. private readonly IScheduler _scheduler;
  26. private readonly ISchedulerLongRunning _longRunning;
  27. private readonly SerialDisposable _disposable = new SerialDisposable();
  28. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  29. {
  30. _scheduler = scheduler;
  31. _observer = observer;
  32. _longRunning = _scheduler.AsLongRunning();
  33. if (_longRunning != null)
  34. {
  35. _dispatcherEvent = new SemaphoreSlim(0);
  36. _dispatcherEventRelease = Disposable.Create(() => _dispatcherEvent.Release());
  37. }
  38. }
  39. private readonly object _dispatcherInitGate = new object();
  40. private readonly SemaphoreSlim _dispatcherEvent;
  41. private readonly IDisposable _dispatcherEventRelease;
  42. private IDisposable _dispatcherJob;
  43. private void EnsureDispatcher()
  44. {
  45. if (_dispatcherJob == null)
  46. {
  47. lock (_dispatcherInitGate)
  48. {
  49. if (_dispatcherJob == null)
  50. {
  51. _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
  52. _disposable.Disposable = StableCompositeDisposable.Create
  53. (
  54. _dispatcherJob,
  55. _dispatcherEventRelease
  56. );
  57. }
  58. }
  59. }
  60. }
  61. private void Dispatch(ICancelable cancel)
  62. {
  63. while (true)
  64. {
  65. _dispatcherEvent.Wait();
  66. if (cancel.IsDisposed)
  67. return;
  68. var next = default(T);
  69. while (_queue.TryDequeue(out next))
  70. {
  71. try
  72. {
  73. _observer.OnNext(next);
  74. }
  75. catch
  76. {
  77. var nop = default(T);
  78. while (_queue.TryDequeue(out nop))
  79. ;
  80. throw;
  81. }
  82. _dispatcherEvent.Wait();
  83. if (cancel.IsDisposed)
  84. return;
  85. }
  86. if (_failed)
  87. {
  88. _observer.OnError(_error);
  89. Dispose();
  90. return;
  91. }
  92. if (_completed)
  93. {
  94. _observer.OnCompleted();
  95. Dispose();
  96. return;
  97. }
  98. }
  99. }
  100. public void EnsureActive()
  101. {
  102. EnsureActive(1);
  103. }
  104. public void EnsureActive(int n)
  105. {
  106. if (_longRunning != null)
  107. {
  108. if (n > 0)
  109. _dispatcherEvent.Release(n);
  110. EnsureDispatcher();
  111. }
  112. else
  113. EnsureActiveSlow();
  114. }
  115. private void EnsureActiveSlow()
  116. {
  117. var isOwner = false;
  118. #pragma warning disable 0420
  119. while (true)
  120. {
  121. var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
  122. if (old == STOPPED)
  123. {
  124. isOwner = true; // RUNNING
  125. break;
  126. }
  127. if (old == FAULTED)
  128. return;
  129. // If we find the consumer loop running, we transition to PENDING to handle
  130. // the case where the queue is seen empty by the consumer, making it transition
  131. // to the STOPPED state, but we inserted an item into the queue.
  132. //
  133. // C: _queue.TryDequeue == false (RUNNING)
  134. // ----------------------------------------------
  135. // P: _queue.Enqueue(...)
  136. // EnsureActive
  137. // Exchange(ref _state, RUNNING) == RUNNING
  138. // ----------------------------------------------
  139. // C: transition to STOPPED (STOPPED)
  140. //
  141. // In this case, P would believe C is running and not invoke the scheduler
  142. // using the isOwner flag.
  143. //
  144. // By introducing an intermediate PENDING state and using CAS in the consumer
  145. // to only transition to STOPPED in case we were still RUNNING, we can force
  146. // the consumer to reconsider the decision to transition to STOPPED. In that
  147. // case, the consumer loops again and re-reads from the queue and other state
  148. // fields. At least one bit of state will have changed because EnsureActive
  149. // should only be called after invocation of IObserver<T> methods that touch
  150. // this state.
  151. //
  152. if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
  153. break;
  154. }
  155. #pragma warning restore 0420
  156. if (isOwner)
  157. {
  158. _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
  159. }
  160. }
  161. private void Run(object state, Action<object> recurse)
  162. {
  163. #pragma warning disable 0420
  164. var next = default(T);
  165. while (!_queue.TryDequeue(out next))
  166. {
  167. if (_failed)
  168. {
  169. // Between transitioning to _failed and the queue check in the loop,
  170. // items could have been queued, so we can't stop yet. We don't spin
  171. // and immediately re-check the queue.
  172. //
  173. // C: _queue.TryDequeue == false
  174. // ----------------------------------------------
  175. // P: OnNext(...)
  176. // _queue.Enqueue(...) // Will get lost
  177. // P: OnError(...)
  178. // _failed = true
  179. // ----------------------------------------------
  180. // C: if (_failed)
  181. // _observer.OnError(...) // Lost an OnNext
  182. //
  183. if (!_queue.IsEmpty)
  184. continue;
  185. Interlocked.Exchange(ref _state, STOPPED);
  186. _observer.OnError(_error);
  187. Dispose();
  188. return;
  189. }
  190. if (_completed)
  191. {
  192. // Between transitioning to _completed and the queue check in the loop,
  193. // items could have been queued, so we can't stop yet. We don't spin
  194. // and immediately re-check the queue.
  195. //
  196. // C: _queue.TryDequeue == false
  197. // ----------------------------------------------
  198. // P: OnNext(...)
  199. // _queue.Enqueue(...) // Will get lost
  200. // P: OnCompleted(...)
  201. // _completed = true
  202. // ----------------------------------------------
  203. // C: if (_completed)
  204. // _observer.OnCompleted() // Lost an OnNext
  205. //
  206. if (!_queue.IsEmpty)
  207. continue;
  208. Interlocked.Exchange(ref _state, STOPPED);
  209. _observer.OnCompleted();
  210. Dispose();
  211. return;
  212. }
  213. var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
  214. if (old == RUNNING || old == FAULTED)
  215. return;
  216. Debug.Assert(old == PENDING);
  217. // The producer has put us in the PENDING state to prevent us from
  218. // transitioning to STOPPED, so we go RUNNING again and re-check our state.
  219. _state = RUNNING;
  220. }
  221. Interlocked.Exchange(ref _state, RUNNING);
  222. #pragma warning restore 0420
  223. try
  224. {
  225. _observer.OnNext(next);
  226. }
  227. catch
  228. {
  229. #pragma warning disable 0420
  230. Interlocked.Exchange(ref _state, FAULTED);
  231. #pragma warning restore 0420
  232. var nop = default(T);
  233. while (_queue.TryDequeue(out nop))
  234. ;
  235. throw;
  236. }
  237. recurse(state);
  238. }
  239. protected override void OnNextCore(T value)
  240. {
  241. _queue.Enqueue(value);
  242. }
  243. protected override void OnErrorCore(Exception exception)
  244. {
  245. _error = exception;
  246. _failed = true;
  247. }
  248. protected override void OnCompletedCore()
  249. {
  250. _completed = true;
  251. }
  252. protected override void Dispose(bool disposing)
  253. {
  254. base.Dispose(disposing);
  255. if (disposing)
  256. {
  257. _disposable.Dispose();
  258. }
  259. }
  260. }
  261. #else
  262. class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
  263. {
  264. private bool _isAcquired = false;
  265. private bool _hasFaulted = false;
  266. private readonly Queue<Action> _queue = new Queue<Action>();
  267. private readonly IObserver<T> _observer;
  268. private readonly IScheduler _scheduler;
  269. private readonly SerialDisposable _disposable = new SerialDisposable();
  270. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  271. {
  272. _scheduler = scheduler;
  273. _observer = observer;
  274. }
  275. public void EnsureActive(int n)
  276. {
  277. EnsureActive();
  278. }
  279. public void EnsureActive()
  280. {
  281. var isOwner = false;
  282. lock (_queue)
  283. {
  284. if (!_hasFaulted && _queue.Count > 0)
  285. {
  286. isOwner = !_isAcquired;
  287. _isAcquired = true;
  288. }
  289. }
  290. if (isOwner)
  291. {
  292. _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
  293. }
  294. }
  295. private void Run(object state, Action<object> recurse)
  296. {
  297. var work = default(Action);
  298. lock (_queue)
  299. {
  300. if (_queue.Count > 0)
  301. work = _queue.Dequeue();
  302. else
  303. {
  304. _isAcquired = false;
  305. return;
  306. }
  307. }
  308. try
  309. {
  310. work();
  311. }
  312. catch
  313. {
  314. lock (_queue)
  315. {
  316. _queue.Clear();
  317. _hasFaulted = true;
  318. }
  319. throw;
  320. }
  321. recurse(state);
  322. }
  323. protected override void OnNextCore(T value)
  324. {
  325. lock (_queue)
  326. _queue.Enqueue(() => _observer.OnNext(value));
  327. }
  328. protected override void OnErrorCore(Exception exception)
  329. {
  330. lock (_queue)
  331. _queue.Enqueue(() => _observer.OnError(exception));
  332. }
  333. protected override void OnCompletedCore()
  334. {
  335. lock (_queue)
  336. _queue.Enqueue(() => _observer.OnCompleted());
  337. }
  338. protected override void Dispose(bool disposing)
  339. {
  340. base.Dispose(disposing);
  341. if (disposing)
  342. {
  343. _disposable.Dispose();
  344. }
  345. }
  346. }
  347. #endif
  348. class ObserveOnObserver<T> : ScheduledObserver<T>
  349. {
  350. private IDisposable _cancel;
  351. public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel)
  352. : base(scheduler, observer)
  353. {
  354. _cancel = cancel;
  355. }
  356. protected override void OnNextCore(T value)
  357. {
  358. base.OnNextCore(value);
  359. EnsureActive();
  360. }
  361. protected override void OnErrorCore(Exception exception)
  362. {
  363. base.OnErrorCore(exception);
  364. EnsureActive();
  365. }
  366. protected override void OnCompletedCore()
  367. {
  368. base.OnCompletedCore();
  369. EnsureActive();
  370. }
  371. protected override void Dispose(bool disposing)
  372. {
  373. base.Dispose(disposing);
  374. if (disposing)
  375. {
  376. var cancel = Interlocked.Exchange(ref _cancel, null);
  377. if (cancel != null)
  378. {
  379. cancel.Dispose();
  380. }
  381. }
  382. }
  383. }
  384. interface IScheduledObserver<T> : IObserver<T>, IDisposable
  385. {
  386. void EnsureActive();
  387. void EnsureActive(int count);
  388. }
  389. }