ScheduledObserver.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive
  9. {
  10. using System.Collections.Concurrent;
  11. using System.Diagnostics;
  12. internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
  13. {
  14. private volatile int _state = 0;
  15. private const int STOPPED = 0;
  16. private const int RUNNING = 1;
  17. private const int PENDING = 2;
  18. private const int FAULTED = 9;
  19. private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  20. private volatile bool _failed;
  21. private volatile Exception _error;
  22. private volatile bool _completed;
  23. private readonly IObserver<T> _observer;
  24. private readonly IScheduler _scheduler;
  25. private readonly ISchedulerLongRunning _longRunning;
  26. private readonly SerialDisposable _disposable = new SerialDisposable();
  27. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  28. {
  29. _scheduler = scheduler;
  30. _observer = observer;
  31. _longRunning = _scheduler.AsLongRunning();
  32. if (_longRunning != null)
  33. {
  34. _dispatcherEvent = new SemaphoreSlim(0);
  35. _dispatcherEventRelease = Disposable.Create(() => _dispatcherEvent.Release());
  36. }
  37. }
  38. private readonly object _dispatcherInitGate = new object();
  39. private readonly SemaphoreSlim _dispatcherEvent;
  40. private readonly IDisposable _dispatcherEventRelease;
  41. private IDisposable _dispatcherJob;
  42. private void EnsureDispatcher()
  43. {
  44. if (_dispatcherJob == null)
  45. {
  46. lock (_dispatcherInitGate)
  47. {
  48. if (_dispatcherJob == null)
  49. {
  50. _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
  51. _disposable.Disposable = StableCompositeDisposable.Create
  52. (
  53. _dispatcherJob,
  54. _dispatcherEventRelease
  55. );
  56. }
  57. }
  58. }
  59. }
  60. private void Dispatch(ICancelable cancel)
  61. {
  62. while (true)
  63. {
  64. _dispatcherEvent.Wait();
  65. if (cancel.IsDisposed)
  66. return;
  67. var next = default(T);
  68. while (_queue.TryDequeue(out next))
  69. {
  70. try
  71. {
  72. _observer.OnNext(next);
  73. }
  74. catch
  75. {
  76. var nop = default(T);
  77. while (_queue.TryDequeue(out nop))
  78. ;
  79. throw;
  80. }
  81. _dispatcherEvent.Wait();
  82. if (cancel.IsDisposed)
  83. return;
  84. }
  85. if (_failed)
  86. {
  87. _observer.OnError(_error);
  88. Dispose();
  89. return;
  90. }
  91. if (_completed)
  92. {
  93. _observer.OnCompleted();
  94. Dispose();
  95. return;
  96. }
  97. }
  98. }
  99. public void EnsureActive() => EnsureActive(1);
  100. public void EnsureActive(int n)
  101. {
  102. if (_longRunning != null)
  103. {
  104. if (n > 0)
  105. {
  106. _dispatcherEvent.Release(n);
  107. }
  108. EnsureDispatcher();
  109. }
  110. else
  111. EnsureActiveSlow();
  112. }
  113. private void EnsureActiveSlow()
  114. {
  115. var isOwner = false;
  116. while (true)
  117. {
  118. var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
  119. if (old == STOPPED)
  120. {
  121. isOwner = true; // RUNNING
  122. break;
  123. }
  124. if (old == FAULTED)
  125. return;
  126. // If we find the consumer loop running, we transition to PENDING to handle
  127. // the case where the queue is seen empty by the consumer, making it transition
  128. // to the STOPPED state, but we inserted an item into the queue.
  129. //
  130. // C: _queue.TryDequeue == false (RUNNING)
  131. // ----------------------------------------------
  132. // P: _queue.Enqueue(...)
  133. // EnsureActive
  134. // Exchange(ref _state, RUNNING) == RUNNING
  135. // ----------------------------------------------
  136. // C: transition to STOPPED (STOPPED)
  137. //
  138. // In this case, P would believe C is running and not invoke the scheduler
  139. // using the isOwner flag.
  140. //
  141. // By introducing an intermediate PENDING state and using CAS in the consumer
  142. // to only transition to STOPPED in case we were still RUNNING, we can force
  143. // the consumer to reconsider the decision to transition to STOPPED. In that
  144. // case, the consumer loops again and re-reads from the queue and other state
  145. // fields. At least one bit of state will have changed because EnsureActive
  146. // should only be called after invocation of IObserver<T> methods that touch
  147. // this state.
  148. //
  149. if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
  150. break;
  151. }
  152. if (isOwner)
  153. {
  154. _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
  155. }
  156. }
  157. private void Run(object state, Action<object> recurse)
  158. {
  159. var next = default(T);
  160. while (!_queue.TryDequeue(out next))
  161. {
  162. if (_failed)
  163. {
  164. // Between transitioning to _failed and the queue check in the loop,
  165. // items could have been queued, so we can't stop yet. We don't spin
  166. // and immediately re-check the queue.
  167. //
  168. // C: _queue.TryDequeue == false
  169. // ----------------------------------------------
  170. // P: OnNext(...)
  171. // _queue.Enqueue(...) // Will get lost
  172. // P: OnError(...)
  173. // _failed = true
  174. // ----------------------------------------------
  175. // C: if (_failed)
  176. // _observer.OnError(...) // Lost an OnNext
  177. //
  178. if (!_queue.IsEmpty)
  179. continue;
  180. Interlocked.Exchange(ref _state, STOPPED);
  181. _observer.OnError(_error);
  182. Dispose();
  183. return;
  184. }
  185. if (_completed)
  186. {
  187. // Between transitioning to _completed and the queue check in the loop,
  188. // items could have been queued, so we can't stop yet. We don't spin
  189. // and immediately re-check the queue.
  190. //
  191. // C: _queue.TryDequeue == false
  192. // ----------------------------------------------
  193. // P: OnNext(...)
  194. // _queue.Enqueue(...) // Will get lost
  195. // P: OnCompleted(...)
  196. // _completed = true
  197. // ----------------------------------------------
  198. // C: if (_completed)
  199. // _observer.OnCompleted() // Lost an OnNext
  200. //
  201. if (!_queue.IsEmpty)
  202. continue;
  203. Interlocked.Exchange(ref _state, STOPPED);
  204. _observer.OnCompleted();
  205. Dispose();
  206. return;
  207. }
  208. var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
  209. if (old == RUNNING || old == FAULTED)
  210. return;
  211. Debug.Assert(old == PENDING);
  212. // The producer has put us in the PENDING state to prevent us from
  213. // transitioning to STOPPED, so we go RUNNING again and re-check our state.
  214. _state = RUNNING;
  215. }
  216. Interlocked.Exchange(ref _state, RUNNING);
  217. try
  218. {
  219. _observer.OnNext(next);
  220. }
  221. catch
  222. {
  223. Interlocked.Exchange(ref _state, FAULTED);
  224. var nop = default(T);
  225. while (_queue.TryDequeue(out nop))
  226. ;
  227. throw;
  228. }
  229. recurse(state);
  230. }
  231. protected override void OnNextCore(T value)
  232. {
  233. _queue.Enqueue(value);
  234. }
  235. protected override void OnErrorCore(Exception exception)
  236. {
  237. _error = exception;
  238. _failed = true;
  239. }
  240. protected override void OnCompletedCore()
  241. {
  242. _completed = true;
  243. }
  244. protected override void Dispose(bool disposing)
  245. {
  246. base.Dispose(disposing);
  247. if (disposing)
  248. {
  249. _disposable.Dispose();
  250. }
  251. }
  252. }
  253. internal sealed class ObserveOnObserver<T> : ScheduledObserver<T>
  254. {
  255. private IDisposable _cancel;
  256. public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel)
  257. : base(scheduler, observer)
  258. {
  259. _cancel = cancel;
  260. }
  261. protected override void OnNextCore(T value)
  262. {
  263. base.OnNextCore(value);
  264. EnsureActive();
  265. }
  266. protected override void OnErrorCore(Exception exception)
  267. {
  268. base.OnErrorCore(exception);
  269. EnsureActive();
  270. }
  271. protected override void OnCompletedCore()
  272. {
  273. base.OnCompletedCore();
  274. EnsureActive();
  275. }
  276. protected override void Dispose(bool disposing)
  277. {
  278. base.Dispose(disposing);
  279. if (disposing)
  280. {
  281. Interlocked.Exchange(ref _cancel, null)?.Dispose();
  282. }
  283. }
  284. }
  285. internal interface IScheduledObserver<T> : IObserver<T>, IDisposable
  286. {
  287. void EnsureActive();
  288. void EnsureActive(int count);
  289. }
  290. }