ReplaySubject.cs 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive.Subjects
  9. {
  10. /// <summary>
  11. /// Represents an object that is both an observable sequence as well as an observer.
  12. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
  13. /// </summary>
  14. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  15. public sealed class ReplaySubject<T> : SubjectBase<T>
  16. {
  17. #region Fields
  18. /// <summary>
  19. /// Underlying optimized implementation of the replay subject.
  20. /// </summary>
  21. private readonly SubjectBase<T> _implementation;
  22. #endregion
  23. #region Constructors
  24. #region All
  25. /// <summary>
  26. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class.
  27. /// </summary>
  28. public ReplaySubject()
  29. : this(int.MaxValue)
  30. {
  31. }
  32. /// <summary>
  33. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified scheduler.
  34. /// </summary>
  35. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  36. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  37. public ReplaySubject(IScheduler scheduler)
  38. {
  39. _implementation = new ReplayByTime(scheduler);
  40. }
  41. #endregion
  42. #region Count
  43. /// <summary>
  44. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified buffer size.
  45. /// </summary>
  46. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  47. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  48. public ReplaySubject(int bufferSize)
  49. {
  50. switch (bufferSize)
  51. {
  52. case 1:
  53. _implementation = new ReplayOne();
  54. break;
  55. case int.MaxValue:
  56. _implementation = new ReplayAll();
  57. break;
  58. default:
  59. _implementation = new ReplayMany(bufferSize);
  60. break;
  61. }
  62. }
  63. /// <summary>
  64. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified buffer size and scheduler.
  65. /// </summary>
  66. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  67. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  68. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  69. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  70. public ReplaySubject(int bufferSize, IScheduler scheduler)
  71. {
  72. _implementation = new ReplayByTime(bufferSize, scheduler);
  73. }
  74. #endregion
  75. #region Time
  76. /// <summary>
  77. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified window.
  78. /// </summary>
  79. /// <param name="window">Maximum time length of the replay buffer.</param>
  80. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
  81. public ReplaySubject(TimeSpan window)
  82. {
  83. _implementation = new ReplayByTime(window);
  84. }
  85. /// <summary>
  86. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified window and scheduler.
  87. /// </summary>
  88. /// <param name="window">Maximum time length of the replay buffer.</param>
  89. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  90. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  91. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
  92. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  93. {
  94. _implementation = new ReplayByTime(window, scheduler);
  95. }
  96. #endregion
  97. #region Count & Time
  98. /// <summary>
  99. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified buffer size and window.
  100. /// </summary>
  101. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  102. /// <param name="window">Maximum time length of the replay buffer.</param>
  103. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
  104. public ReplaySubject(int bufferSize, TimeSpan window)
  105. {
  106. _implementation = new ReplayByTime(bufferSize, window);
  107. }
  108. /// <summary>
  109. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified buffer size, window and scheduler.
  110. /// </summary>
  111. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  112. /// <param name="window">Maximum time length of the replay buffer.</param>
  113. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  114. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
  115. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  116. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  117. {
  118. _implementation = new ReplayByTime(bufferSize, window, scheduler);
  119. }
  120. #endregion
  121. #endregion
  122. #region Properties
  123. /// <summary>
  124. /// Indicates whether the subject has observers subscribed to it.
  125. /// </summary>
  126. public override bool HasObservers => _implementation.HasObservers;
  127. /// <summary>
  128. /// Indicates whether the subject has been disposed.
  129. /// </summary>
  130. public override bool IsDisposed => _implementation.IsDisposed;
  131. #endregion
  132. #region Methods
  133. #region IObserver<T> implementation
  134. /// <summary>
  135. /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
  136. /// </summary>
  137. /// <param name="value">The value to send to all observers.</param>
  138. public override void OnNext(T value) => _implementation.OnNext(value);
  139. /// <summary>
  140. /// Notifies all subscribed and future observers about the specified exception.
  141. /// </summary>
  142. /// <param name="error">The exception to send to all observers.</param>
  143. /// <exception cref="ArgumentNullException"><paramref name="error"/> is <c>null</c>.</exception>
  144. public override void OnError(Exception error)
  145. {
  146. if (error == null)
  147. {
  148. throw new ArgumentNullException(nameof(error));
  149. }
  150. _implementation.OnError(error);
  151. }
  152. /// <summary>
  153. /// Notifies all subscribed and future observers about the end of the sequence.
  154. /// </summary>
  155. public override void OnCompleted() => _implementation.OnCompleted();
  156. #endregion
  157. #region IObservable<T> implementation
  158. /// <summary>
  159. /// Subscribes an observer to the subject.
  160. /// </summary>
  161. /// <param name="observer">Observer to subscribe to the subject.</param>
  162. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  163. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is <c>null</c>.</exception>
  164. public override IDisposable Subscribe(IObserver<T> observer)
  165. {
  166. if (observer == null)
  167. {
  168. throw new ArgumentNullException(nameof(observer));
  169. }
  170. return _implementation.Subscribe(observer);
  171. }
  172. #endregion
  173. #region IDisposable implementation
  174. /// <summary>
  175. /// Releases all resources used by the current instance of the <see cref="ReplaySubject{T}"/> class and unsubscribe all observers.
  176. /// </summary>
  177. public override void Dispose() => _implementation.Dispose();
  178. #endregion
  179. #endregion
  180. private abstract class ReplayBase : SubjectBase<T>
  181. {
  182. private readonly object _gate = new object();
  183. private ImmutableList<IScheduledObserver<T>> _observers;
  184. private bool _isStopped;
  185. private Exception _error;
  186. private bool _isDisposed;
  187. protected ReplayBase()
  188. {
  189. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  190. _isStopped = false;
  191. _error = null;
  192. }
  193. public override bool HasObservers => _observers?.Data.Length > 0;
  194. public override bool IsDisposed
  195. {
  196. get
  197. {
  198. lock (_gate)
  199. {
  200. return _isDisposed;
  201. }
  202. }
  203. }
  204. public override void OnNext(T value)
  205. {
  206. var o = default(IScheduledObserver<T>[]);
  207. lock (_gate)
  208. {
  209. CheckDisposed();
  210. if (!_isStopped)
  211. {
  212. Next(value);
  213. Trim();
  214. o = _observers.Data;
  215. foreach (var observer in o)
  216. {
  217. observer.OnNext(value);
  218. }
  219. }
  220. }
  221. if (o != null)
  222. {
  223. foreach (var observer in o)
  224. {
  225. observer.EnsureActive();
  226. }
  227. }
  228. }
  229. public override void OnError(Exception error)
  230. {
  231. var o = default(IScheduledObserver<T>[]);
  232. lock (_gate)
  233. {
  234. CheckDisposed();
  235. if (!_isStopped)
  236. {
  237. _isStopped = true;
  238. _error = error;
  239. Trim();
  240. o = _observers.Data;
  241. foreach (var observer in o)
  242. {
  243. observer.OnError(error);
  244. }
  245. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  246. }
  247. }
  248. if (o != null)
  249. {
  250. foreach (var observer in o)
  251. {
  252. observer.EnsureActive();
  253. }
  254. }
  255. }
  256. public override void OnCompleted()
  257. {
  258. var o = default(IScheduledObserver<T>[]);
  259. lock (_gate)
  260. {
  261. CheckDisposed();
  262. if (!_isStopped)
  263. {
  264. _isStopped = true;
  265. Trim();
  266. o = _observers.Data;
  267. foreach (var observer in o)
  268. {
  269. observer.OnCompleted();
  270. }
  271. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  272. }
  273. }
  274. if (o != null)
  275. {
  276. foreach (var observer in o)
  277. {
  278. observer.EnsureActive();
  279. }
  280. }
  281. }
  282. public override IDisposable Subscribe(IObserver<T> observer)
  283. {
  284. var so = CreateScheduledObserver(observer);
  285. var n = 0;
  286. var subscription = Disposable.Empty;
  287. lock (_gate)
  288. {
  289. CheckDisposed();
  290. //
  291. // Notice the v1.x behavior of always calling Trim is preserved here.
  292. //
  293. // This may be subject (pun intended) of debate: should this policy
  294. // only be applied while the sequence is active? With the current
  295. // behavior, a sequence will "die out" after it has terminated by
  296. // continuing to drop OnNext notifications from the queue.
  297. //
  298. // In v1.x, this behavior was due to trimming based on the clock value
  299. // returned by scheduler.Now, applied to all but the terminal message
  300. // in the queue. Using the IStopwatch has the same effect. Either way,
  301. // we guarantee the final notification will be observed, but there's
  302. // no way to retain the buffer directly. One approach is to use the
  303. // time-based TakeLast operator and apply an unbounded ReplaySubject
  304. // to it.
  305. //
  306. // To conclude, we're keeping the behavior as-is for compatibility
  307. // reasons with v1.x.
  308. //
  309. Trim();
  310. n = Replay(so);
  311. if (_error != null)
  312. {
  313. n++;
  314. so.OnError(_error);
  315. }
  316. else if (_isStopped)
  317. {
  318. n++;
  319. so.OnCompleted();
  320. }
  321. if (!_isStopped)
  322. {
  323. subscription = new Subscription(this, so);
  324. _observers = _observers.Add(so);
  325. }
  326. }
  327. so.EnsureActive(n);
  328. return subscription;
  329. }
  330. public override void Dispose()
  331. {
  332. lock (_gate)
  333. {
  334. _isDisposed = true;
  335. _observers = null;
  336. DisposeCore();
  337. }
  338. }
  339. protected abstract void DisposeCore();
  340. protected abstract void Next(T value);
  341. protected abstract int Replay(IObserver<T> observer);
  342. protected abstract void Trim();
  343. protected abstract IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer);
  344. private void CheckDisposed()
  345. {
  346. if (_isDisposed)
  347. {
  348. throw new ObjectDisposedException(string.Empty);
  349. }
  350. }
  351. private void Unsubscribe(IScheduledObserver<T> observer)
  352. {
  353. lock (_gate)
  354. {
  355. if (!_isDisposed)
  356. {
  357. _observers = _observers.Remove(observer);
  358. }
  359. }
  360. }
  361. private sealed class Subscription : IDisposable
  362. {
  363. private readonly ReplayBase _subject;
  364. private readonly IScheduledObserver<T> _observer;
  365. public Subscription(ReplayBase subject, IScheduledObserver<T> observer)
  366. {
  367. _subject = subject;
  368. _observer = observer;
  369. }
  370. public void Dispose()
  371. {
  372. _observer.Dispose();
  373. _subject.Unsubscribe(_observer);
  374. }
  375. }
  376. }
  377. /// <summary>
  378. /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
  379. /// </summary>
  380. private sealed class ReplayByTime : ReplayBase
  381. {
  382. private const int InfiniteBufferSize = int.MaxValue;
  383. private readonly int _bufferSize;
  384. private readonly TimeSpan _window;
  385. private readonly IScheduler _scheduler;
  386. private readonly IStopwatch _stopwatch;
  387. private readonly Queue<TimeInterval<T>> _queue;
  388. public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
  389. {
  390. if (bufferSize < 0)
  391. {
  392. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  393. }
  394. if (window < TimeSpan.Zero)
  395. {
  396. throw new ArgumentOutOfRangeException(nameof(window));
  397. }
  398. _bufferSize = bufferSize;
  399. _window = window;
  400. _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler));
  401. _stopwatch = _scheduler.StartStopwatch();
  402. _queue = new Queue<TimeInterval<T>>();
  403. }
  404. public ReplayByTime(int bufferSize, TimeSpan window)
  405. : this(bufferSize, window, SchedulerDefaults.Iteration)
  406. {
  407. }
  408. public ReplayByTime(IScheduler scheduler)
  409. : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
  410. {
  411. }
  412. public ReplayByTime(int bufferSize, IScheduler scheduler)
  413. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  414. {
  415. }
  416. public ReplayByTime(TimeSpan window, IScheduler scheduler)
  417. : this(InfiniteBufferSize, window, scheduler)
  418. {
  419. }
  420. public ReplayByTime(TimeSpan window)
  421. : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
  422. {
  423. }
  424. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  425. {
  426. return new ScheduledObserver<T>(_scheduler, observer);
  427. }
  428. protected override void DisposeCore()
  429. {
  430. _queue.Clear();
  431. }
  432. protected override void Next(T value)
  433. {
  434. var now = _stopwatch.Elapsed;
  435. _queue.Enqueue(new TimeInterval<T>(value, now));
  436. }
  437. protected override int Replay(IObserver<T> observer)
  438. {
  439. var n = _queue.Count;
  440. foreach (var item in _queue)
  441. {
  442. observer.OnNext(item.Value);
  443. }
  444. return n;
  445. }
  446. protected override void Trim()
  447. {
  448. var now = _stopwatch.Elapsed;
  449. while (_queue.Count > _bufferSize)
  450. {
  451. _queue.Dequeue();
  452. }
  453. while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
  454. {
  455. _queue.Dequeue();
  456. }
  457. }
  458. }
  459. //
  460. // Below are the non-time based implementations.
  461. // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
  462. // The ReplayOne implementation also removes the need to even have a queue.
  463. //
  464. private sealed class ReplayOne : ReplayBufferBase
  465. {
  466. private bool _hasValue;
  467. private T _value;
  468. protected override void Trim()
  469. {
  470. //
  471. // No need to trim.
  472. //
  473. }
  474. protected override void Next(T value)
  475. {
  476. _hasValue = true;
  477. _value = value;
  478. }
  479. protected override int Replay(IObserver<T> observer)
  480. {
  481. var n = 0;
  482. if (_hasValue)
  483. {
  484. n = 1;
  485. observer.OnNext(_value);
  486. }
  487. return n;
  488. }
  489. protected override void DisposeCore()
  490. {
  491. _value = default;
  492. }
  493. }
  494. private sealed class ReplayMany : ReplayManyBase
  495. {
  496. private readonly int _bufferSize;
  497. public ReplayMany(int bufferSize)
  498. : base(bufferSize)
  499. {
  500. _bufferSize = bufferSize;
  501. }
  502. protected override void Trim()
  503. {
  504. while (_queue.Count > _bufferSize)
  505. {
  506. _queue.Dequeue();
  507. }
  508. }
  509. }
  510. private sealed class ReplayAll : ReplayManyBase
  511. {
  512. public ReplayAll()
  513. : base(0)
  514. {
  515. }
  516. protected override void Trim()
  517. {
  518. //
  519. // Don't trim, keep all values.
  520. //
  521. }
  522. }
  523. private abstract class ReplayBufferBase : ReplayBase
  524. {
  525. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  526. {
  527. return new FastImmediateObserver<T>(observer);
  528. }
  529. protected override void DisposeCore()
  530. {
  531. }
  532. }
  533. private abstract class ReplayManyBase : ReplayBufferBase
  534. {
  535. protected readonly Queue<T> _queue;
  536. protected ReplayManyBase(int queueSize)
  537. {
  538. _queue = new Queue<T>(Math.Min(queueSize, 64));
  539. }
  540. protected override void Next(T value)
  541. {
  542. _queue.Enqueue(value);
  543. }
  544. protected override int Replay(IObserver<T> observer)
  545. {
  546. var n = _queue.Count;
  547. foreach (var item in _queue)
  548. {
  549. observer.OnNext(item);
  550. }
  551. return n;
  552. }
  553. protected override void DisposeCore()
  554. {
  555. _queue.Clear();
  556. }
  557. }
  558. }
  559. /// <summary>
  560. /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
  561. /// </summary>
  562. /// <typeparam name="T">Type of the elements processed by the observer.</typeparam>
  563. internal sealed class FastImmediateObserver<T> : IScheduledObserver<T>
  564. {
  565. /// <summary>
  566. /// Gate to control ownership transfer and protect data structures.
  567. /// </summary>
  568. private readonly object _gate = new object();
  569. /// <summary>
  570. /// Observer to forward notifications to.
  571. /// </summary>
  572. private volatile IObserver<T> _observer;
  573. /// <summary>
  574. /// Queue to enqueue OnNext notifications into.
  575. /// </summary>
  576. private Queue<T> _queue = new Queue<T>();
  577. /// <summary>
  578. /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
  579. /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
  580. /// </summary>
  581. private Queue<T> _queue2;
  582. /// <summary>
  583. /// Exception passed to an OnError notification, if any.
  584. /// </summary>
  585. private Exception _error;
  586. /// <summary>
  587. /// Indicates whether an OnCompleted notification was received.
  588. /// </summary>
  589. private bool _done;
  590. /// <summary>
  591. /// Indicates whether the observer is busy, i.e. some thread is actively draining the
  592. /// notifications that were queued up.
  593. /// </summary>
  594. private bool _busy;
  595. /// <summary>
  596. /// Indicates whether a failure occurred when the owner was draining the queue. This will
  597. /// prevent future work to be processed.
  598. /// </summary>
  599. private bool _hasFaulted;
  600. /// <summary>
  601. /// Creates a new scheduled observer that proxies to the specified observer.
  602. /// </summary>
  603. /// <param name="observer">Observer to forward notifications to.</param>
  604. public FastImmediateObserver(IObserver<T> observer)
  605. {
  606. _observer = observer;
  607. }
  608. /// <summary>
  609. /// Disposes the observer.
  610. /// </summary>
  611. public void Dispose()
  612. {
  613. Done();
  614. }
  615. /// <summary>
  616. /// Notifies the observer of pending work. This will either cause the current owner to
  617. /// process the newly enqueued notifications, or it will cause the calling thread to
  618. /// become the owner and start processing the notification queue.
  619. /// </summary>
  620. public void EnsureActive()
  621. {
  622. EnsureActive(1);
  623. }
  624. /// <summary>
  625. /// Notifies the observer of pending work. This will either cause the current owner to
  626. /// process the newly enqueued notifications, or it will cause the calling thread to
  627. /// become the owner and start processing the notification queue.
  628. /// </summary>
  629. /// <param name="count">The number of enqueued notifications to process (ignored).</param>
  630. public void EnsureActive(int count)
  631. {
  632. var isOwner = false;
  633. lock (_gate)
  634. {
  635. //
  636. // If we failed to process work in the past, we'll simply drop it.
  637. //
  638. if (!_hasFaulted)
  639. {
  640. //
  641. // If no-one is processing the notification queue, become the owner.
  642. //
  643. if (!_busy)
  644. {
  645. isOwner = true;
  646. _busy = true;
  647. }
  648. }
  649. }
  650. if (isOwner)
  651. {
  652. while (true)
  653. {
  654. var queue = default(Queue<T>);
  655. var error = default(Exception);
  656. var done = false;
  657. //
  658. // Steal notifications from the producer side to drain them to the observer.
  659. //
  660. lock (_gate)
  661. {
  662. //
  663. // Do we have any OnNext notifications to process?
  664. //
  665. if (_queue.Count > 0)
  666. {
  667. if (_queue2 == null)
  668. {
  669. _queue2 = new Queue<T>();
  670. }
  671. //
  672. // Swap out the current queue for a fresh or recycled one. The standby
  673. // queue is set to null; when notifications are sent out the processed
  674. // queue will become the new standby.
  675. //
  676. queue = _queue;
  677. _queue = _queue2;
  678. _queue2 = null;
  679. }
  680. //
  681. // Do we have any terminal notifications to process?
  682. //
  683. if (_error != null)
  684. {
  685. error = _error;
  686. }
  687. else if (_done)
  688. {
  689. done = true;
  690. }
  691. else if (queue == null)
  692. {
  693. //
  694. // No work left; quit the loop and let another thread become the
  695. // owner in the future.
  696. //
  697. _busy = false;
  698. break;
  699. }
  700. }
  701. try
  702. {
  703. //
  704. // Process OnNext notifications, if any.
  705. //
  706. if (queue != null)
  707. {
  708. //
  709. // Drain the stolen OnNext notification queue.
  710. //
  711. while (queue.Count > 0)
  712. {
  713. _observer.OnNext(queue.Dequeue());
  714. }
  715. //
  716. // The queue is now empty, so we can reuse it by making it the standby
  717. // queue for a future swap.
  718. //
  719. lock (_gate)
  720. {
  721. _queue2 = queue;
  722. }
  723. }
  724. //
  725. // Process terminal notifications, if any. Notice we don't release ownership
  726. // after processing these notifications; we simply quit from the loop. This
  727. // will cause all processing of the scheduler observer to cease.
  728. //
  729. if (error != null)
  730. {
  731. var observer = Done();
  732. observer.OnError(error);
  733. break;
  734. }
  735. if (done)
  736. {
  737. var observer = Done();
  738. observer.OnCompleted();
  739. break;
  740. }
  741. }
  742. catch
  743. {
  744. lock (_gate)
  745. {
  746. _hasFaulted = true;
  747. _queue.Clear();
  748. }
  749. throw;
  750. }
  751. }
  752. }
  753. }
  754. /// <summary>
  755. /// Enqueues an OnCompleted notification.
  756. /// </summary>
  757. public void OnCompleted()
  758. {
  759. lock (_gate)
  760. {
  761. if (!_hasFaulted)
  762. {
  763. _done = true;
  764. }
  765. }
  766. }
  767. /// <summary>
  768. /// Enqueues an OnError notification.
  769. /// </summary>
  770. /// <param name="error">Error of the notification.</param>
  771. public void OnError(Exception error)
  772. {
  773. lock (_gate)
  774. {
  775. if (!_hasFaulted)
  776. {
  777. _error = error;
  778. }
  779. }
  780. }
  781. /// <summary>
  782. /// Enqueues an OnNext notification.
  783. /// </summary>
  784. /// <param name="value">Value of the notification.</param>
  785. public void OnNext(T value)
  786. {
  787. lock (_gate)
  788. {
  789. if (!_hasFaulted)
  790. {
  791. _queue.Enqueue(value);
  792. }
  793. }
  794. }
  795. /// <summary>
  796. /// Terminates the observer upon receiving terminal notifications, thus preventing
  797. /// future notifications to go out.
  798. /// </summary>
  799. /// <returns>Observer to send terminal notifications to.</returns>
  800. private IObserver<T> Done()
  801. {
  802. return Interlocked.Exchange(ref _observer, NopObserver<T>.Instance);
  803. }
  804. }
  805. }