ScheduledObserver.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. namespace System.Reactive
  7. {
  8. #if !NO_PERF && !NO_CDS
  9. using System.Collections.Concurrent;
  10. using System.Diagnostics;
  11. internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
  12. {
  13. private volatile int _state = 0;
  14. private const int STOPPED = 0;
  15. private const int RUNNING = 1;
  16. private const int PENDING = 2;
  17. private const int FAULTED = 9;
  18. private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  19. private volatile bool _failed;
  20. private volatile Exception _error;
  21. private volatile bool _completed;
  22. private readonly IObserver<T> _observer;
  23. private readonly IScheduler _scheduler;
  24. private readonly ISchedulerLongRunning _longRunning;
  25. private readonly SerialDisposable _disposable = new SerialDisposable();
  26. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  27. {
  28. _scheduler = scheduler;
  29. _observer = observer;
  30. _longRunning = _scheduler.AsLongRunning();
  31. if (_longRunning != null)
  32. {
  33. _dispatcherEvent = new SemaphoreSlim(0);
  34. _dispatcherEventRelease = Disposable.Create(() => _dispatcherEvent.Release());
  35. }
  36. }
  37. private readonly object _dispatcherInitGate = new object();
  38. private readonly SemaphoreSlim _dispatcherEvent;
  39. private readonly IDisposable _dispatcherEventRelease;
  40. private IDisposable _dispatcherJob;
  41. private void EnsureDispatcher()
  42. {
  43. if (_dispatcherJob == null)
  44. {
  45. lock (_dispatcherInitGate)
  46. {
  47. if (_dispatcherJob == null)
  48. {
  49. _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
  50. _disposable.Disposable = StableCompositeDisposable.Create
  51. (
  52. _dispatcherJob,
  53. _dispatcherEventRelease
  54. );
  55. }
  56. }
  57. }
  58. }
  59. private void Dispatch(ICancelable cancel)
  60. {
  61. while (true)
  62. {
  63. _dispatcherEvent.Wait();
  64. if (cancel.IsDisposed)
  65. return;
  66. var next = default(T);
  67. while (_queue.TryDequeue(out next))
  68. {
  69. try
  70. {
  71. _observer.OnNext(next);
  72. }
  73. catch
  74. {
  75. var nop = default(T);
  76. while (_queue.TryDequeue(out nop))
  77. ;
  78. throw;
  79. }
  80. _dispatcherEvent.Wait();
  81. if (cancel.IsDisposed)
  82. return;
  83. }
  84. if (_failed)
  85. {
  86. _observer.OnError(_error);
  87. Dispose();
  88. return;
  89. }
  90. if (_completed)
  91. {
  92. _observer.OnCompleted();
  93. Dispose();
  94. return;
  95. }
  96. }
  97. }
  98. public void EnsureActive()
  99. {
  100. EnsureActive(1);
  101. }
  102. public void EnsureActive(int n)
  103. {
  104. if (_longRunning != null)
  105. {
  106. if (n > 0)
  107. _dispatcherEvent.Release(n);
  108. EnsureDispatcher();
  109. }
  110. else
  111. EnsureActiveSlow();
  112. }
  113. private void EnsureActiveSlow()
  114. {
  115. var isOwner = false;
  116. #pragma warning disable 0420
  117. while (true)
  118. {
  119. var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
  120. if (old == STOPPED)
  121. {
  122. isOwner = true; // RUNNING
  123. break;
  124. }
  125. if (old == FAULTED)
  126. return;
  127. // If we find the consumer loop running, we transition to PENDING to handle
  128. // the case where the queue is seen empty by the consumer, making it transition
  129. // to the STOPPED state, but we inserted an item into the queue.
  130. //
  131. // C: _queue.TryDequeue == false (RUNNING)
  132. // ----------------------------------------------
  133. // P: _queue.Enqueue(...)
  134. // EnsureActive
  135. // Exchange(ref _state, RUNNING) == RUNNING
  136. // ----------------------------------------------
  137. // C: transition to STOPPED (STOPPED)
  138. //
  139. // In this case, P would believe C is running and not invoke the scheduler
  140. // using the isOwner flag.
  141. //
  142. // By introducing an intermediate PENDING state and using CAS in the consumer
  143. // to only transition to STOPPED in case we were still RUNNING, we can force
  144. // the consumer to reconsider the decision to transition to STOPPED. In that
  145. // case, the consumer loops again and re-reads from the queue and other state
  146. // fields. At least one bit of state will have changed because EnsureActive
  147. // should only be called after invocation of IObserver<T> methods that touch
  148. // this state.
  149. //
  150. if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
  151. break;
  152. }
  153. #pragma warning restore 0420
  154. if (isOwner)
  155. {
  156. _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
  157. }
  158. }
  159. private void Run(object state, Action<object> recurse)
  160. {
  161. #pragma warning disable 0420
  162. var next = default(T);
  163. while (!_queue.TryDequeue(out next))
  164. {
  165. if (_failed)
  166. {
  167. // Between transitioning to _failed and the queue check in the loop,
  168. // items could have been queued, so we can't stop yet. We don't spin
  169. // and immediately re-check the queue.
  170. //
  171. // C: _queue.TryDequeue == false
  172. // ----------------------------------------------
  173. // P: OnNext(...)
  174. // _queue.Enqueue(...) // Will get lost
  175. // P: OnError(...)
  176. // _failed = true
  177. // ----------------------------------------------
  178. // C: if (_failed)
  179. // _observer.OnError(...) // Lost an OnNext
  180. //
  181. if (!_queue.IsEmpty)
  182. continue;
  183. Interlocked.Exchange(ref _state, STOPPED);
  184. _observer.OnError(_error);
  185. Dispose();
  186. return;
  187. }
  188. if (_completed)
  189. {
  190. // Between transitioning to _completed and the queue check in the loop,
  191. // items could have been queued, so we can't stop yet. We don't spin
  192. // and immediately re-check the queue.
  193. //
  194. // C: _queue.TryDequeue == false
  195. // ----------------------------------------------
  196. // P: OnNext(...)
  197. // _queue.Enqueue(...) // Will get lost
  198. // P: OnCompleted(...)
  199. // _completed = true
  200. // ----------------------------------------------
  201. // C: if (_completed)
  202. // _observer.OnCompleted() // Lost an OnNext
  203. //
  204. if (!_queue.IsEmpty)
  205. continue;
  206. Interlocked.Exchange(ref _state, STOPPED);
  207. _observer.OnCompleted();
  208. Dispose();
  209. return;
  210. }
  211. var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
  212. if (old == RUNNING || old == FAULTED)
  213. return;
  214. Debug.Assert(old == PENDING);
  215. // The producer has put us in the PENDING state to prevent us from
  216. // transitioning to STOPPED, so we go RUNNING again and re-check our state.
  217. _state = RUNNING;
  218. }
  219. Interlocked.Exchange(ref _state, RUNNING);
  220. #pragma warning restore 0420
  221. try
  222. {
  223. _observer.OnNext(next);
  224. }
  225. catch
  226. {
  227. #pragma warning disable 0420
  228. Interlocked.Exchange(ref _state, FAULTED);
  229. #pragma warning restore 0420
  230. var nop = default(T);
  231. while (_queue.TryDequeue(out nop))
  232. ;
  233. throw;
  234. }
  235. recurse(state);
  236. }
  237. protected override void OnNextCore(T value)
  238. {
  239. _queue.Enqueue(value);
  240. }
  241. protected override void OnErrorCore(Exception exception)
  242. {
  243. _error = exception;
  244. _failed = true;
  245. }
  246. protected override void OnCompletedCore()
  247. {
  248. _completed = true;
  249. }
  250. protected override void Dispose(bool disposing)
  251. {
  252. base.Dispose(disposing);
  253. if (disposing)
  254. {
  255. _disposable.Dispose();
  256. }
  257. }
  258. }
  259. #else
  260. class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
  261. {
  262. private bool _isAcquired = false;
  263. private bool _hasFaulted = false;
  264. private readonly Queue<Action> _queue = new Queue<Action>();
  265. private readonly IObserver<T> _observer;
  266. private readonly IScheduler _scheduler;
  267. private readonly SerialDisposable _disposable = new SerialDisposable();
  268. public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
  269. {
  270. _scheduler = scheduler;
  271. _observer = observer;
  272. }
  273. public void EnsureActive(int n)
  274. {
  275. EnsureActive();
  276. }
  277. public void EnsureActive()
  278. {
  279. var isOwner = false;
  280. lock (_queue)
  281. {
  282. if (!_hasFaulted && _queue.Count > 0)
  283. {
  284. isOwner = !_isAcquired;
  285. _isAcquired = true;
  286. }
  287. }
  288. if (isOwner)
  289. {
  290. _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
  291. }
  292. }
  293. private void Run(object state, Action<object> recurse)
  294. {
  295. var work = default(Action);
  296. lock (_queue)
  297. {
  298. if (_queue.Count > 0)
  299. work = _queue.Dequeue();
  300. else
  301. {
  302. _isAcquired = false;
  303. return;
  304. }
  305. }
  306. try
  307. {
  308. work();
  309. }
  310. catch
  311. {
  312. lock (_queue)
  313. {
  314. _queue.Clear();
  315. _hasFaulted = true;
  316. }
  317. throw;
  318. }
  319. recurse(state);
  320. }
  321. protected override void OnNextCore(T value)
  322. {
  323. lock (_queue)
  324. _queue.Enqueue(() => _observer.OnNext(value));
  325. }
  326. protected override void OnErrorCore(Exception exception)
  327. {
  328. lock (_queue)
  329. _queue.Enqueue(() => _observer.OnError(exception));
  330. }
  331. protected override void OnCompletedCore()
  332. {
  333. lock (_queue)
  334. _queue.Enqueue(() => _observer.OnCompleted());
  335. }
  336. protected override void Dispose(bool disposing)
  337. {
  338. base.Dispose(disposing);
  339. if (disposing)
  340. {
  341. _disposable.Dispose();
  342. }
  343. }
  344. }
  345. #endif
  346. class ObserveOnObserver<T> : ScheduledObserver<T>
  347. {
  348. private IDisposable _cancel;
  349. public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel)
  350. : base(scheduler, observer)
  351. {
  352. _cancel = cancel;
  353. }
  354. protected override void OnNextCore(T value)
  355. {
  356. base.OnNextCore(value);
  357. EnsureActive();
  358. }
  359. protected override void OnErrorCore(Exception exception)
  360. {
  361. base.OnErrorCore(exception);
  362. EnsureActive();
  363. }
  364. protected override void OnCompletedCore()
  365. {
  366. base.OnCompletedCore();
  367. EnsureActive();
  368. }
  369. protected override void Dispose(bool disposing)
  370. {
  371. base.Dispose(disposing);
  372. if (disposing)
  373. {
  374. var cancel = Interlocked.Exchange(ref _cancel, null);
  375. if (cancel != null)
  376. {
  377. cancel.Dispose();
  378. }
  379. }
  380. }
  381. }
  382. interface IScheduledObserver<T> : IObserver<T>, IDisposable
  383. {
  384. void EnsureActive();
  385. void EnsureActive(int count);
  386. }
  387. }