ReplaySubject.cs 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Threading;
  5. namespace System.Reactive.Subjects
  6. {
  7. /// <summary>
  8. /// Represents an object that is both an observable sequence as well as an observer.
  9. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
  10. /// </summary>
  11. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  12. public sealed class ReplaySubject<T> : ISubject<T>, IDisposable
  13. {
  14. #region Fields
  15. /// <summary>
  16. /// Underlying optimized implementation of the replay subject.
  17. /// </summary>
  18. private readonly IReplaySubjectImplementation _implementation;
  19. #endregion
  20. #region Constructors
  21. #region All
  22. /// <summary>
  23. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
  24. /// </summary>
  25. public ReplaySubject()
  26. : this(int.MaxValue)
  27. {
  28. }
  29. /// <summary>
  30. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified scheduler.
  31. /// </summary>
  32. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  33. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  34. public ReplaySubject(IScheduler scheduler)
  35. {
  36. _implementation = new ReplayByTime(scheduler);
  37. }
  38. #endregion
  39. #region Count
  40. /// <summary>
  41. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size.
  42. /// </summary>
  43. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  44. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  45. public ReplaySubject(int bufferSize)
  46. {
  47. switch (bufferSize)
  48. {
  49. case 1:
  50. _implementation = new ReplayOne();
  51. break;
  52. case int.MaxValue:
  53. _implementation = new ReplayAll();
  54. break;
  55. default:
  56. _implementation = new ReplayMany(bufferSize);
  57. break;
  58. }
  59. }
  60. /// <summary>
  61. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and scheduler.
  62. /// </summary>
  63. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  64. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  65. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  66. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  67. public ReplaySubject(int bufferSize, IScheduler scheduler)
  68. {
  69. _implementation = new ReplayByTime(bufferSize, scheduler);
  70. }
  71. #endregion
  72. #region Time
  73. /// <summary>
  74. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window.
  75. /// </summary>
  76. /// <param name="window">Maximum time length of the replay buffer.</param>
  77. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  78. public ReplaySubject(TimeSpan window)
  79. {
  80. _implementation = new ReplayByTime(window);
  81. }
  82. /// <summary>
  83. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window and scheduler.
  84. /// </summary>
  85. /// <param name="window">Maximum time length of the replay buffer.</param>
  86. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  87. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  88. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  89. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  90. {
  91. _implementation = new ReplayByTime(window, scheduler);
  92. }
  93. #endregion
  94. #region Count & Time
  95. /// <summary>
  96. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and window.
  97. /// </summary>
  98. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  99. /// <param name="window">Maximum time length of the replay buffer.</param>
  100. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  101. public ReplaySubject(int bufferSize, TimeSpan window)
  102. {
  103. _implementation = new ReplayByTime(bufferSize, window);
  104. }
  105. /// <summary>
  106. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size, window and scheduler.
  107. /// </summary>
  108. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  109. /// <param name="window">Maximum time length of the replay buffer.</param>
  110. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  111. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  112. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  113. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  114. {
  115. _implementation = new ReplayByTime(bufferSize, window, scheduler);
  116. }
  117. #endregion
  118. #endregion
  119. #region Properties
  120. /// <summary>
  121. /// Indicates whether the subject has observers subscribed to it.
  122. /// </summary>
  123. public bool HasObservers
  124. {
  125. get { return _implementation.HasObservers; }
  126. }
  127. #endregion
  128. #region Methods
  129. #region Observer implementation
  130. /// <summary>
  131. /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
  132. /// </summary>
  133. /// <param name="value">The value to send to all observers.</param>
  134. public void OnNext(T value)
  135. {
  136. _implementation.OnNext(value);
  137. }
  138. /// <summary>
  139. /// Notifies all subscribed and future observers about the specified exception.
  140. /// </summary>
  141. /// <param name="error">The exception to send to all observers.</param>
  142. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  143. public void OnError(Exception error)
  144. {
  145. _implementation.OnError(error);
  146. }
  147. /// <summary>
  148. /// Notifies all subscribed and future observers about the end of the sequence.
  149. /// </summary>
  150. public void OnCompleted()
  151. {
  152. _implementation.OnCompleted();
  153. }
  154. #endregion
  155. #region IObservable implementation
  156. /// <summary>
  157. /// Subscribes an observer to the subject.
  158. /// </summary>
  159. /// <param name="observer">Observer to subscribe to the subject.</param>
  160. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  161. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  162. public IDisposable Subscribe(IObserver<T> observer)
  163. {
  164. return _implementation.Subscribe(observer);
  165. }
  166. #endregion
  167. #region IDisposable implementation
  168. /// <summary>
  169. /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
  170. /// </summary>
  171. public void Dispose()
  172. {
  173. _implementation.Dispose();
  174. }
  175. #endregion
  176. #endregion
  177. private interface IReplaySubjectImplementation : ISubject<T>, IDisposable
  178. {
  179. bool HasObservers { get; }
  180. }
  181. private abstract class ReplayBase : IReplaySubjectImplementation
  182. {
  183. private readonly object _gate = new object();
  184. private ImmutableList<IScheduledObserver<T>> _observers;
  185. private bool _isStopped;
  186. private Exception _error;
  187. private bool _isDisposed;
  188. public ReplayBase()
  189. {
  190. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  191. _isStopped = false;
  192. _error = null;
  193. }
  194. public bool HasObservers
  195. {
  196. get
  197. {
  198. var observers = _observers;
  199. return observers != null && observers.Data.Length > 0;
  200. }
  201. }
  202. public void OnNext(T value)
  203. {
  204. var o = default(IScheduledObserver<T>[]);
  205. lock (_gate)
  206. {
  207. CheckDisposed();
  208. if (!_isStopped)
  209. {
  210. Next(value);
  211. Trim();
  212. o = _observers.Data;
  213. foreach (var observer in o)
  214. observer.OnNext(value);
  215. }
  216. }
  217. if (o != null)
  218. {
  219. foreach (var observer in o)
  220. observer.EnsureActive();
  221. }
  222. }
  223. public void OnError(Exception error)
  224. {
  225. if (error == null)
  226. throw new ArgumentNullException("error");
  227. var o = default(IScheduledObserver<T>[]);
  228. lock (_gate)
  229. {
  230. CheckDisposed();
  231. if (!_isStopped)
  232. {
  233. _isStopped = true;
  234. _error = error;
  235. Trim();
  236. o = _observers.Data;
  237. foreach (var observer in o)
  238. observer.OnError(error);
  239. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  240. }
  241. }
  242. if (o != null)
  243. {
  244. foreach (var observer in o)
  245. observer.EnsureActive();
  246. }
  247. }
  248. public void OnCompleted()
  249. {
  250. var o = default(IScheduledObserver<T>[]);
  251. lock (_gate)
  252. {
  253. CheckDisposed();
  254. if (!_isStopped)
  255. {
  256. _isStopped = true;
  257. Trim();
  258. o = _observers.Data;
  259. foreach (var observer in o)
  260. observer.OnCompleted();
  261. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  262. }
  263. }
  264. if (o != null)
  265. {
  266. foreach (var observer in o)
  267. observer.EnsureActive();
  268. }
  269. }
  270. public IDisposable Subscribe(IObserver<T> observer)
  271. {
  272. if (observer == null)
  273. throw new ArgumentNullException("observer");
  274. var so = CreateScheduledObserver(observer);
  275. var n = 0;
  276. var subscription = new Subscription(this, so);
  277. lock (_gate)
  278. {
  279. CheckDisposed();
  280. //
  281. // Notice the v1.x behavior of always calling Trim is preserved here.
  282. //
  283. // This may be subject (pun intended) of debate: should this policy
  284. // only be applied while the sequence is active? With the current
  285. // behavior, a sequence will "die out" after it has terminated by
  286. // continuing to drop OnNext notifications from the queue.
  287. //
  288. // In v1.x, this behavior was due to trimming based on the clock value
  289. // returned by scheduler.Now, applied to all but the terminal message
  290. // in the queue. Using the IStopwatch has the same effect. Either way,
  291. // we guarantee the final notification will be observed, but there's
  292. // no way to retain the buffer directly. One approach is to use the
  293. // time-based TakeLast operator and apply an unbounded ReplaySubject
  294. // to it.
  295. //
  296. // To conclude, we're keeping the behavior as-is for compatibility
  297. // reasons with v1.x.
  298. //
  299. Trim();
  300. _observers = _observers.Add(so);
  301. n = Replay(so);
  302. if (_error != null)
  303. {
  304. n++;
  305. so.OnError(_error);
  306. }
  307. else if (_isStopped)
  308. {
  309. n++;
  310. so.OnCompleted();
  311. }
  312. }
  313. so.EnsureActive(n);
  314. return subscription;
  315. }
  316. public void Dispose()
  317. {
  318. lock (_gate)
  319. {
  320. _isDisposed = true;
  321. _observers = null;
  322. DisposeCore();
  323. }
  324. }
  325. protected abstract void DisposeCore();
  326. protected abstract void Next(T value);
  327. protected abstract int Replay(IObserver<T> observer);
  328. protected abstract void Trim();
  329. protected abstract IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer);
  330. private void CheckDisposed()
  331. {
  332. if (_isDisposed)
  333. throw new ObjectDisposedException(string.Empty);
  334. }
  335. private void Unsubscribe(IScheduledObserver<T> observer)
  336. {
  337. lock (_gate)
  338. {
  339. if (!_isDisposed)
  340. {
  341. _observers = _observers.Remove(observer);
  342. }
  343. }
  344. }
  345. private sealed class Subscription : IDisposable
  346. {
  347. private readonly ReplayBase _subject;
  348. private readonly IScheduledObserver<T> _observer;
  349. public Subscription(ReplayBase subject, IScheduledObserver<T> observer)
  350. {
  351. _subject = subject;
  352. _observer = observer;
  353. }
  354. public void Dispose()
  355. {
  356. _observer.Dispose();
  357. _subject.Unsubscribe(_observer);
  358. }
  359. }
  360. }
  361. /// <summary>
  362. /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
  363. /// </summary>
  364. private sealed class ReplayByTime : ReplayBase
  365. {
  366. private const int InfiniteBufferSize = int.MaxValue;
  367. private readonly int _bufferSize;
  368. private readonly TimeSpan _window;
  369. private readonly IScheduler _scheduler;
  370. private readonly IStopwatch _stopwatch;
  371. private readonly Queue<TimeInterval<T>> _queue;
  372. public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
  373. {
  374. if (bufferSize < 0)
  375. throw new ArgumentOutOfRangeException("bufferSize");
  376. if (window < TimeSpan.Zero)
  377. throw new ArgumentOutOfRangeException("window");
  378. if (scheduler == null)
  379. throw new ArgumentNullException("scheduler");
  380. _bufferSize = bufferSize;
  381. _window = window;
  382. _scheduler = scheduler;
  383. _stopwatch = _scheduler.StartStopwatch();
  384. _queue = new Queue<TimeInterval<T>>();
  385. }
  386. public ReplayByTime(int bufferSize, TimeSpan window)
  387. : this(bufferSize, window, SchedulerDefaults.Iteration)
  388. {
  389. }
  390. public ReplayByTime(IScheduler scheduler)
  391. : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
  392. {
  393. }
  394. public ReplayByTime(int bufferSize, IScheduler scheduler)
  395. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  396. {
  397. }
  398. public ReplayByTime(TimeSpan window, IScheduler scheduler)
  399. : this(InfiniteBufferSize, window, scheduler)
  400. {
  401. }
  402. public ReplayByTime(TimeSpan window)
  403. : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
  404. {
  405. }
  406. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  407. {
  408. return new ScheduledObserver<T>(_scheduler, observer);
  409. }
  410. protected override void DisposeCore()
  411. {
  412. _queue.Clear();
  413. }
  414. protected override void Next(T value)
  415. {
  416. var now = _stopwatch.Elapsed;
  417. _queue.Enqueue(new TimeInterval<T>(value, now));
  418. }
  419. protected override int Replay(IObserver<T> observer)
  420. {
  421. var n = _queue.Count;
  422. foreach (var item in _queue)
  423. observer.OnNext(item.Value);
  424. return n;
  425. }
  426. protected override void Trim()
  427. {
  428. var now = _stopwatch.Elapsed;
  429. while (_queue.Count > _bufferSize)
  430. _queue.Dequeue();
  431. while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
  432. _queue.Dequeue();
  433. }
  434. }
  435. //
  436. // Below are the non-time based implementations.
  437. // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
  438. // The ReplayOne implementation also removes the need to even have a queue.
  439. //
  440. private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
  441. {
  442. private bool _hasValue;
  443. private T _value;
  444. protected override void Trim()
  445. {
  446. //
  447. // No need to trim.
  448. //
  449. }
  450. protected override void Next(T value)
  451. {
  452. _hasValue = true;
  453. _value = value;
  454. }
  455. protected override int Replay(IObserver<T> observer)
  456. {
  457. var n = 0;
  458. if (_hasValue)
  459. {
  460. n = 1;
  461. observer.OnNext(_value);
  462. }
  463. return n;
  464. }
  465. protected override void DisposeCore()
  466. {
  467. _value = default(T);
  468. }
  469. }
  470. private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
  471. {
  472. private readonly int _bufferSize;
  473. public ReplayMany(int bufferSize)
  474. : base(bufferSize)
  475. {
  476. _bufferSize = bufferSize;
  477. }
  478. protected override void Trim()
  479. {
  480. while (_queue.Count > _bufferSize)
  481. _queue.Dequeue();
  482. }
  483. }
  484. private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
  485. {
  486. public ReplayAll()
  487. : base(0)
  488. {
  489. }
  490. protected override void Trim()
  491. {
  492. //
  493. // Don't trim, keep all values.
  494. //
  495. }
  496. }
  497. private abstract class ReplayBufferBase : ReplayBase
  498. {
  499. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  500. {
  501. return new FastImmediateObserver<T>(observer);
  502. }
  503. protected override void DisposeCore()
  504. {
  505. }
  506. }
  507. private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
  508. {
  509. protected readonly Queue<T> _queue;
  510. protected ReplayManyBase(int queueSize)
  511. : base()
  512. {
  513. _queue = new Queue<T>(Math.Min(queueSize, 64));
  514. }
  515. protected override void Next(T value)
  516. {
  517. _queue.Enqueue(value);
  518. }
  519. protected override int Replay(IObserver<T> observer)
  520. {
  521. var n = _queue.Count;
  522. foreach (var item in _queue)
  523. observer.OnNext(item);
  524. return n;
  525. }
  526. protected override void DisposeCore()
  527. {
  528. _queue.Clear();
  529. }
  530. }
  531. }
  532. /// <summary>
  533. /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
  534. /// </summary>
  535. /// <typeparam name="T">Type of the elements processed by the observer.</typeparam>
  536. class FastImmediateObserver<T> : IScheduledObserver<T>
  537. {
  538. /// <summary>
  539. /// Gate to control ownership transfer and protect data structures.
  540. /// </summary>
  541. private readonly object _gate = new object();
  542. /// <summary>
  543. /// Observer to forward notifications to.
  544. /// </summary>
  545. private volatile IObserver<T> _observer;
  546. /// <summary>
  547. /// Queue to enqueue OnNext notifications into.
  548. /// </summary>
  549. private Queue<T> _queue = new Queue<T>();
  550. /// <summary>
  551. /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
  552. /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
  553. /// </summary>
  554. private Queue<T> _queue2;
  555. /// <summary>
  556. /// Exception passed to an OnError notification, if any.
  557. /// </summary>
  558. private Exception _error;
  559. /// <summary>
  560. /// Indicates whether an OnCompleted notification was received.
  561. /// </summary>
  562. private bool _done;
  563. /// <summary>
  564. /// Indicates whether the observer is busy, i.e. some thread is actively draining the
  565. /// notifications that were queued up.
  566. /// </summary>
  567. private bool _busy;
  568. /// <summary>
  569. /// Indicates whether a failure occurred when the owner was draining the queue. This will
  570. /// prevent future work to be processed.
  571. /// </summary>
  572. private bool _hasFaulted;
  573. /// <summary>
  574. /// Creates a new scheduled observer that proxies to the specified observer.
  575. /// </summary>
  576. /// <param name="observer">Observer to forward notifications to.</param>
  577. public FastImmediateObserver(IObserver<T> observer)
  578. {
  579. _observer = observer;
  580. }
  581. /// <summary>
  582. /// Disposes the observer.
  583. /// </summary>
  584. public void Dispose()
  585. {
  586. Done();
  587. }
  588. /// <summary>
  589. /// Notifies the observer of pending work. This will either cause the current owner to
  590. /// process the newly enqueued notifications, or it will cause the calling thread to
  591. /// become the owner and start processing the notification queue.
  592. /// </summary>
  593. public void EnsureActive()
  594. {
  595. EnsureActive(1);
  596. }
  597. /// <summary>
  598. /// Notifies the observer of pending work. This will either cause the current owner to
  599. /// process the newly enqueued notifications, or it will cause the calling thread to
  600. /// become the owner and start processing the notification queue.
  601. /// </summary>
  602. /// <param name="count">The number of enqueued notifications to process (ignored).</param>
  603. public void EnsureActive(int count)
  604. {
  605. var isOwner = false;
  606. lock (_gate)
  607. {
  608. //
  609. // If we failed to process work in the past, we'll simply drop it.
  610. //
  611. if (!_hasFaulted)
  612. {
  613. //
  614. // If no-one is processing the notification queue, become the owner.
  615. //
  616. if (!_busy)
  617. {
  618. isOwner = true;
  619. _busy = true;
  620. }
  621. }
  622. }
  623. if (isOwner)
  624. {
  625. while (true)
  626. {
  627. var queue = default(Queue<T>);
  628. var error = default(Exception);
  629. var done = false;
  630. //
  631. // Steal notifications from the producer side to drain them to the observer.
  632. //
  633. lock (_gate)
  634. {
  635. //
  636. // Do we have any OnNext notifications to process?
  637. //
  638. if (_queue.Count > 0)
  639. {
  640. if (_queue2 == null)
  641. {
  642. _queue2 = new Queue<T>();
  643. }
  644. //
  645. // Swap out the current queue for a fresh or recycled one. The standby
  646. // queue is set to null; when notifications are sent out the processed
  647. // queue will become the new standby.
  648. //
  649. queue = _queue;
  650. _queue = _queue2;
  651. _queue2 = null;
  652. }
  653. //
  654. // Do we have any terminal notifications to process?
  655. //
  656. if (_error != null)
  657. {
  658. error = _error;
  659. }
  660. else if (_done)
  661. {
  662. done = true;
  663. }
  664. else if (queue == null)
  665. {
  666. //
  667. // No work left; quit the loop and let another thread become the
  668. // owner in the future.
  669. //
  670. _busy = false;
  671. break;
  672. }
  673. }
  674. try
  675. {
  676. //
  677. // Process OnNext notifications, if any.
  678. //
  679. if (queue != null)
  680. {
  681. //
  682. // Drain the stolen OnNext notification queue.
  683. //
  684. while (queue.Count > 0)
  685. {
  686. _observer.OnNext(queue.Dequeue());
  687. }
  688. //
  689. // The queue is now empty, so we can reuse it by making it the standby
  690. // queue for a future swap.
  691. //
  692. lock (_gate)
  693. {
  694. _queue2 = queue;
  695. }
  696. }
  697. //
  698. // Process terminal notifications, if any. Notice we don't release ownership
  699. // after processing these notifications; we simply quit from the loop. This
  700. // will cause all processing of the scheduler observer to cease.
  701. //
  702. if (error != null)
  703. {
  704. var observer = Done();
  705. observer.OnError(error);
  706. break;
  707. }
  708. else if (done)
  709. {
  710. var observer = Done();
  711. observer.OnCompleted();
  712. break;
  713. }
  714. }
  715. catch
  716. {
  717. lock (_gate)
  718. {
  719. _hasFaulted = true;
  720. _queue.Clear();
  721. }
  722. throw;
  723. }
  724. }
  725. }
  726. }
  727. /// <summary>
  728. /// Enqueues an OnCompleted notification.
  729. /// </summary>
  730. public void OnCompleted()
  731. {
  732. lock (_gate)
  733. {
  734. if (!_hasFaulted)
  735. {
  736. _done = true;
  737. }
  738. }
  739. }
  740. /// <summary>
  741. /// Enqueues an OnError notification.
  742. /// </summary>
  743. /// <param name="error">Error of the notification.</param>
  744. public void OnError(Exception error)
  745. {
  746. lock (_gate)
  747. {
  748. if (!_hasFaulted)
  749. {
  750. _error = error;
  751. }
  752. }
  753. }
  754. /// <summary>
  755. /// Enqueues an OnNext notification.
  756. /// </summary>
  757. /// <param name="value">Value of the notification.</param>
  758. public void OnNext(T value)
  759. {
  760. lock (_gate)
  761. {
  762. if (!_hasFaulted)
  763. {
  764. _queue.Enqueue(value);
  765. }
  766. }
  767. }
  768. /// <summary>
  769. /// Terminates the observer upon receiving terminal notifications, thus preventing
  770. /// future notifications to go out.
  771. /// </summary>
  772. /// <returns>Observer to send terminal notifications to.</returns>
  773. private IObserver<T> Done()
  774. {
  775. return Interlocked.Exchange(ref _observer, NopObserver<T>.Instance);
  776. }
  777. }
  778. }