ScheduledObserver.cs 13 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. namespace System.Reactive
  7. {
  8. #if !NO_PERF && !NO_CDS
  9. using System.Collections.Concurrent;
  10. using System.Diagnostics;
  11. internal class ScheduledObserver<T> : ObserverBase<T>, IDisposable
  12. {
  13. private volatile int _state = 0;
  14. private const int STOPPED = 0;
  15. private const int RUNNING = 1;
  16. private const int PENDING = 2;
  17. private const int FAULTED = 9;
  18. private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  19. private volatile bool _failed;
  20. private volatile Exception _error;
  21. private volatile bool _completed;
  22. private readonly IObserver<T> _observer;
  23. private readonly IScheduler _scheduler;
  24. private readonly ISchedulerLongRunning _longRunning;
  25. private readonly SerialDisposable _disposable = new SerialDisposable();
  26. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  27. {
  28. _scheduler = scheduler;
  29. _observer = observer;
  30. _longRunning = _scheduler.AsLongRunning();
  31. if (_longRunning != null)
  32. _dispatcherEvent = new SemaphoreSlim(0);
  33. }
  34. private readonly object _dispatcherInitGate = new object();
  35. private SemaphoreSlim _dispatcherEvent;
  36. private IDisposable _dispatcherJob;
  37. private void EnsureDispatcher()
  38. {
  39. if (_dispatcherJob == null)
  40. {
  41. lock (_dispatcherInitGate)
  42. {
  43. if (_dispatcherJob == null)
  44. {
  45. _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
  46. _disposable.Disposable = new CompositeDisposable(2)
  47. {
  48. _dispatcherJob,
  49. Disposable.Create(() => _dispatcherEvent.Release())
  50. };
  51. }
  52. }
  53. }
  54. }
  55. private void Dispatch(ICancelable cancel)
  56. {
  57. while (true)
  58. {
  59. _dispatcherEvent.Wait();
  60. if (cancel.IsDisposed)
  61. return;
  62. var next = default(T);
  63. while (_queue.TryDequeue(out next))
  64. {
  65. try
  66. {
  67. _observer.OnNext(next);
  68. }
  69. catch
  70. {
  71. var nop = default(T);
  72. while (_queue.TryDequeue(out nop))
  73. ;
  74. throw;
  75. }
  76. _dispatcherEvent.Wait();
  77. if (cancel.IsDisposed)
  78. return;
  79. }
  80. if (_failed)
  81. {
  82. _observer.OnError(_error);
  83. Dispose();
  84. return;
  85. }
  86. if (_completed)
  87. {
  88. _observer.OnCompleted();
  89. Dispose();
  90. return;
  91. }
  92. }
  93. }
  94. public void EnsureActive()
  95. {
  96. EnsureActive(1);
  97. }
  98. public void EnsureActive(int n)
  99. {
  100. if (_longRunning != null)
  101. {
  102. if (n > 0)
  103. _dispatcherEvent.Release(n);
  104. EnsureDispatcher();
  105. }
  106. else
  107. EnsureActiveSlow();
  108. }
  109. private void EnsureActiveSlow()
  110. {
  111. var isOwner = false;
  112. #pragma warning disable 0420
  113. while (true)
  114. {
  115. var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
  116. if (old == STOPPED)
  117. {
  118. isOwner = true; // RUNNING
  119. break;
  120. }
  121. if (old == FAULTED)
  122. return;
  123. // If we find the consumer loop running, we transition to PENDING to handle
  124. // the case where the queue is seen empty by the consumer, making it transition
  125. // to the STOPPED state, but we inserted an item into the queue.
  126. //
  127. // C: _queue.TryDequeue == false (RUNNING)
  128. // ----------------------------------------------
  129. // P: _queue.Enqueue(...)
  130. // EnsureActive
  131. // Exchange(ref _state, RUNNING) == RUNNING
  132. // ----------------------------------------------
  133. // C: transition to STOPPED (STOPPED)
  134. //
  135. // In this case, P would believe C is running and not invoke the scheduler
  136. // using the isOwner flag.
  137. //
  138. // By introducing an intermediate PENDING state and using CAS in the consumer
  139. // to only transition to STOPPED in case we were still RUNNING, we can force
  140. // the consumer to reconsider the decision to transition to STOPPED. In that
  141. // case, the consumer loops again and re-reads from the queue and other state
  142. // fields. At least one bit of state will have changed because EnsureActive
  143. // should only be called after invocation of IObserver<T> methods that touch
  144. // this state.
  145. //
  146. if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
  147. break;
  148. }
  149. #pragma warning restore 0420
  150. if (isOwner)
  151. {
  152. _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
  153. }
  154. }
  155. private void Run(object state, Action<object> recurse)
  156. {
  157. #pragma warning disable 0420
  158. var next = default(T);
  159. while (!_queue.TryDequeue(out next))
  160. {
  161. if (_failed)
  162. {
  163. // Between transitioning to _failed and the queue check in the loop,
  164. // items could have been queued, so we can't stop yet. We don't spin
  165. // and immediately re-check the queue.
  166. //
  167. // C: _queue.TryDequeue == false
  168. // ----------------------------------------------
  169. // P: OnNext(...)
  170. // _queue.Enqueue(...) // Will get lost
  171. // P: OnError(...)
  172. // _failed = true
  173. // ----------------------------------------------
  174. // C: if (_failed)
  175. // _observer.OnError(...) // Lost an OnNext
  176. //
  177. if (!_queue.IsEmpty)
  178. continue;
  179. Interlocked.Exchange(ref _state, STOPPED);
  180. _observer.OnError(_error);
  181. Dispose();
  182. return;
  183. }
  184. if (_completed)
  185. {
  186. // Between transitioning to _completed and the queue check in the loop,
  187. // items could have been queued, so we can't stop yet. We don't spin
  188. // and immediately re-check the queue.
  189. //
  190. // C: _queue.TryDequeue == false
  191. // ----------------------------------------------
  192. // P: OnNext(...)
  193. // _queue.Enqueue(...) // Will get lost
  194. // P: OnCompleted(...)
  195. // _completed = true
  196. // ----------------------------------------------
  197. // C: if (_completed)
  198. // _observer.OnCompleted() // Lost an OnNext
  199. //
  200. if (!_queue.IsEmpty)
  201. continue;
  202. Interlocked.Exchange(ref _state, STOPPED);
  203. _observer.OnCompleted();
  204. Dispose();
  205. return;
  206. }
  207. var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
  208. if (old == RUNNING || old == FAULTED)
  209. return;
  210. Debug.Assert(old == PENDING);
  211. // The producer has put us in the PENDING state to prevent us from
  212. // transitioning to STOPPED, so we go RUNNING again and re-check our state.
  213. _state = RUNNING;
  214. }
  215. Interlocked.Exchange(ref _state, RUNNING);
  216. #pragma warning restore 0420
  217. try
  218. {
  219. _observer.OnNext(next);
  220. }
  221. catch
  222. {
  223. #pragma warning disable 0420
  224. Interlocked.Exchange(ref _state, FAULTED);
  225. #pragma warning restore 0420
  226. var nop = default(T);
  227. while (_queue.TryDequeue(out nop))
  228. ;
  229. throw;
  230. }
  231. recurse(state);
  232. }
  233. protected override void OnNextCore(T value)
  234. {
  235. _queue.Enqueue(value);
  236. }
  237. protected override void OnErrorCore(Exception exception)
  238. {
  239. _error = exception;
  240. _failed = true;
  241. }
  242. protected override void OnCompletedCore()
  243. {
  244. _completed = true;
  245. }
  246. protected override void Dispose(bool disposing)
  247. {
  248. base.Dispose(disposing);
  249. if (disposing)
  250. {
  251. _disposable.Dispose();
  252. }
  253. }
  254. }
  255. #else
  256. class ScheduledObserver<T> : ObserverBase<T>, IDisposable
  257. {
  258. private bool _isAcquired = false;
  259. private bool _hasFaulted = false;
  260. private readonly Queue<Action> _queue = new Queue<Action>();
  261. private readonly IObserver<T> _observer;
  262. private readonly IScheduler _scheduler;
  263. private readonly SerialDisposable _disposable = new SerialDisposable();
  264. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  265. {
  266. _scheduler = scheduler;
  267. _observer = observer;
  268. }
  269. public void EnsureActive(int n)
  270. {
  271. EnsureActive();
  272. }
  273. public void EnsureActive()
  274. {
  275. var isOwner = false;
  276. lock (_queue)
  277. {
  278. if (!_hasFaulted && _queue.Count > 0)
  279. {
  280. isOwner = !_isAcquired;
  281. _isAcquired = true;
  282. }
  283. }
  284. if (isOwner)
  285. {
  286. _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
  287. }
  288. }
  289. private void Run(object state, Action<object> recurse)
  290. {
  291. var work = default(Action);
  292. lock (_queue)
  293. {
  294. if (_queue.Count > 0)
  295. work = _queue.Dequeue();
  296. else
  297. {
  298. _isAcquired = false;
  299. return;
  300. }
  301. }
  302. try
  303. {
  304. work();
  305. }
  306. catch
  307. {
  308. lock (_queue)
  309. {
  310. _queue.Clear();
  311. _hasFaulted = true;
  312. }
  313. throw;
  314. }
  315. recurse(state);
  316. }
  317. protected override void OnNextCore(T value)
  318. {
  319. lock (_queue)
  320. _queue.Enqueue(() => _observer.OnNext(value));
  321. }
  322. protected override void OnErrorCore(Exception exception)
  323. {
  324. lock (_queue)
  325. _queue.Enqueue(() => _observer.OnError(exception));
  326. }
  327. protected override void OnCompletedCore()
  328. {
  329. lock (_queue)
  330. _queue.Enqueue(() => _observer.OnCompleted());
  331. }
  332. protected override void Dispose(bool disposing)
  333. {
  334. base.Dispose(disposing);
  335. if (disposing)
  336. {
  337. _disposable.Dispose();
  338. }
  339. }
  340. }
  341. #endif
  342. class ObserveOnObserver<T> : ScheduledObserver<T>
  343. {
  344. private IDisposable _cancel;
  345. public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel)
  346. : base(scheduler, observer)
  347. {
  348. _cancel = cancel;
  349. }
  350. protected override void OnNextCore(T value)
  351. {
  352. base.OnNextCore(value);
  353. EnsureActive();
  354. }
  355. protected override void OnErrorCore(Exception exception)
  356. {
  357. base.OnErrorCore(exception);
  358. EnsureActive();
  359. }
  360. protected override void OnCompletedCore()
  361. {
  362. base.OnCompletedCore();
  363. EnsureActive();
  364. }
  365. protected override void Dispose(bool disposing)
  366. {
  367. base.Dispose(disposing);
  368. if (disposing)
  369. {
  370. var cancel = Interlocked.Exchange(ref _cancel, null);
  371. if (cancel != null)
  372. {
  373. cancel.Dispose();
  374. }
  375. }
  376. }
  377. }
  378. }