ReplaySubject.cs 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive.Subjects
  9. {
  10. /// <summary>
  11. /// Represents an object that is both an observable sequence as well as an observer.
  12. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
  13. /// </summary>
  14. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  15. public sealed class ReplaySubject<T> : SubjectBase<T>, IDisposable
  16. {
  17. #region Fields
  18. /// <summary>
  19. /// Underlying optimized implementation of the replay subject.
  20. /// </summary>
  21. private readonly SubjectBase<T> _implementation;
  22. #endregion
  23. #region Constructors
  24. #region All
  25. /// <summary>
  26. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class.
  27. /// </summary>
  28. public ReplaySubject()
  29. : this(int.MaxValue)
  30. {
  31. }
  32. /// <summary>
  33. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified scheduler.
  34. /// </summary>
  35. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  36. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  37. public ReplaySubject(IScheduler scheduler)
  38. {
  39. _implementation = new ReplayByTime(scheduler);
  40. }
  41. #endregion
  42. #region Count
  43. /// <summary>
  44. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified buffer size.
  45. /// </summary>
  46. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  47. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  48. public ReplaySubject(int bufferSize)
  49. {
  50. switch (bufferSize)
  51. {
  52. case 1:
  53. _implementation = new ReplayOne();
  54. break;
  55. case int.MaxValue:
  56. _implementation = new ReplayAll();
  57. break;
  58. default:
  59. _implementation = new ReplayMany(bufferSize);
  60. break;
  61. }
  62. }
  63. /// <summary>
  64. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified buffer size and scheduler.
  65. /// </summary>
  66. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  67. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  68. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  69. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  70. public ReplaySubject(int bufferSize, IScheduler scheduler)
  71. {
  72. _implementation = new ReplayByTime(bufferSize, scheduler);
  73. }
  74. #endregion
  75. #region Time
  76. /// <summary>
  77. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified window.
  78. /// </summary>
  79. /// <param name="window">Maximum time length of the replay buffer.</param>
  80. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
  81. public ReplaySubject(TimeSpan window)
  82. {
  83. _implementation = new ReplayByTime(window);
  84. }
  85. /// <summary>
  86. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified window and scheduler.
  87. /// </summary>
  88. /// <param name="window">Maximum time length of the replay buffer.</param>
  89. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  90. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  91. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
  92. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  93. {
  94. _implementation = new ReplayByTime(window, scheduler);
  95. }
  96. #endregion
  97. #region Count & Time
  98. /// <summary>
  99. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified buffer size and window.
  100. /// </summary>
  101. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  102. /// <param name="window">Maximum time length of the replay buffer.</param>
  103. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
  104. public ReplaySubject(int bufferSize, TimeSpan window)
  105. {
  106. _implementation = new ReplayByTime(bufferSize, window);
  107. }
  108. /// <summary>
  109. /// Initializes a new instance of the <see cref="ReplaySubject{T}" /> class with the specified buffer size, window and scheduler.
  110. /// </summary>
  111. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  112. /// <param name="window">Maximum time length of the replay buffer.</param>
  113. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  114. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
  115. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  116. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  117. {
  118. _implementation = new ReplayByTime(bufferSize, window, scheduler);
  119. }
  120. #endregion
  121. #endregion
  122. #region Properties
  123. /// <summary>
  124. /// Indicates whether the subject has observers subscribed to it.
  125. /// </summary>
  126. public override bool HasObservers => _implementation.HasObservers;
  127. /// <summary>
  128. /// Indicates whether the subject has been disposed.
  129. /// </summary>
  130. public override bool IsDisposed => _implementation.IsDisposed;
  131. #endregion
  132. #region Methods
  133. #region IObserver<T> implementation
  134. /// <summary>
  135. /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
  136. /// </summary>
  137. /// <param name="value">The value to send to all observers.</param>
  138. public override void OnNext(T value) => _implementation.OnNext(value);
  139. /// <summary>
  140. /// Notifies all subscribed and future observers about the specified exception.
  141. /// </summary>
  142. /// <param name="error">The exception to send to all observers.</param>
  143. /// <exception cref="ArgumentNullException"><paramref name="error"/> is <c>null</c>.</exception>
  144. public override void OnError(Exception error)
  145. {
  146. if (error == null)
  147. throw new ArgumentNullException(nameof(error));
  148. _implementation.OnError(error);
  149. }
  150. /// <summary>
  151. /// Notifies all subscribed and future observers about the end of the sequence.
  152. /// </summary>
  153. public override void OnCompleted() => _implementation.OnCompleted();
  154. #endregion
  155. #region IObservable<T> implementation
  156. /// <summary>
  157. /// Subscribes an observer to the subject.
  158. /// </summary>
  159. /// <param name="observer">Observer to subscribe to the subject.</param>
  160. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  161. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is <c>null</c>.</exception>
  162. public override IDisposable Subscribe(IObserver<T> observer)
  163. {
  164. if (observer == null)
  165. throw new ArgumentNullException(nameof(observer));
  166. return _implementation.Subscribe(observer);
  167. }
  168. #endregion
  169. #region IDisposable implementation
  170. /// <summary>
  171. /// Releases all resources used by the current instance of the <see cref="ReplaySubject{T}"/> class and unsubscribe all observers.
  172. /// </summary>
  173. public override void Dispose() => _implementation.Dispose();
  174. #endregion
  175. #endregion
  176. private abstract class ReplayBase : SubjectBase<T>
  177. {
  178. private readonly object _gate = new object();
  179. private ImmutableList<IScheduledObserver<T>> _observers;
  180. private bool _isStopped;
  181. private Exception _error;
  182. private bool _isDisposed;
  183. public ReplayBase()
  184. {
  185. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  186. _isStopped = false;
  187. _error = null;
  188. }
  189. public override bool HasObservers => _observers?.Data.Length > 0;
  190. public override bool IsDisposed
  191. {
  192. get
  193. {
  194. lock (_gate)
  195. {
  196. return _isDisposed;
  197. }
  198. }
  199. }
  200. public override void OnNext(T value)
  201. {
  202. var o = default(IScheduledObserver<T>[]);
  203. lock (_gate)
  204. {
  205. CheckDisposed();
  206. if (!_isStopped)
  207. {
  208. Next(value);
  209. Trim();
  210. o = _observers.Data;
  211. foreach (var observer in o)
  212. {
  213. observer.OnNext(value);
  214. }
  215. }
  216. }
  217. if (o != null)
  218. {
  219. foreach (var observer in o)
  220. {
  221. observer.EnsureActive();
  222. }
  223. }
  224. }
  225. public override void OnError(Exception error)
  226. {
  227. var o = default(IScheduledObserver<T>[]);
  228. lock (_gate)
  229. {
  230. CheckDisposed();
  231. if (!_isStopped)
  232. {
  233. _isStopped = true;
  234. _error = error;
  235. Trim();
  236. o = _observers.Data;
  237. foreach (var observer in o)
  238. {
  239. observer.OnError(error);
  240. }
  241. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  242. }
  243. }
  244. if (o != null)
  245. {
  246. foreach (var observer in o)
  247. {
  248. observer.EnsureActive();
  249. }
  250. }
  251. }
  252. public override void OnCompleted()
  253. {
  254. var o = default(IScheduledObserver<T>[]);
  255. lock (_gate)
  256. {
  257. CheckDisposed();
  258. if (!_isStopped)
  259. {
  260. _isStopped = true;
  261. Trim();
  262. o = _observers.Data;
  263. foreach (var observer in o)
  264. {
  265. observer.OnCompleted();
  266. }
  267. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  268. }
  269. }
  270. if (o != null)
  271. {
  272. foreach (var observer in o)
  273. {
  274. observer.EnsureActive();
  275. }
  276. }
  277. }
  278. public override IDisposable Subscribe(IObserver<T> observer)
  279. {
  280. var so = CreateScheduledObserver(observer);
  281. var n = 0;
  282. var subscription = Disposable.Empty;
  283. lock (_gate)
  284. {
  285. CheckDisposed();
  286. //
  287. // Notice the v1.x behavior of always calling Trim is preserved here.
  288. //
  289. // This may be subject (pun intended) of debate: should this policy
  290. // only be applied while the sequence is active? With the current
  291. // behavior, a sequence will "die out" after it has terminated by
  292. // continuing to drop OnNext notifications from the queue.
  293. //
  294. // In v1.x, this behavior was due to trimming based on the clock value
  295. // returned by scheduler.Now, applied to all but the terminal message
  296. // in the queue. Using the IStopwatch has the same effect. Either way,
  297. // we guarantee the final notification will be observed, but there's
  298. // no way to retain the buffer directly. One approach is to use the
  299. // time-based TakeLast operator and apply an unbounded ReplaySubject
  300. // to it.
  301. //
  302. // To conclude, we're keeping the behavior as-is for compatibility
  303. // reasons with v1.x.
  304. //
  305. Trim();
  306. n = Replay(so);
  307. if (_error != null)
  308. {
  309. n++;
  310. so.OnError(_error);
  311. }
  312. else if (_isStopped)
  313. {
  314. n++;
  315. so.OnCompleted();
  316. }
  317. if (!_isStopped)
  318. {
  319. subscription = new Subscription(this, so);
  320. _observers = _observers.Add(so);
  321. }
  322. }
  323. so.EnsureActive(n);
  324. return subscription;
  325. }
  326. public override void Dispose()
  327. {
  328. lock (_gate)
  329. {
  330. _isDisposed = true;
  331. _observers = null;
  332. DisposeCore();
  333. }
  334. }
  335. protected abstract void DisposeCore();
  336. protected abstract void Next(T value);
  337. protected abstract int Replay(IObserver<T> observer);
  338. protected abstract void Trim();
  339. protected abstract IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer);
  340. private void CheckDisposed()
  341. {
  342. if (_isDisposed)
  343. throw new ObjectDisposedException(string.Empty);
  344. }
  345. private void Unsubscribe(IScheduledObserver<T> observer)
  346. {
  347. lock (_gate)
  348. {
  349. if (!_isDisposed)
  350. {
  351. _observers = _observers.Remove(observer);
  352. }
  353. }
  354. }
  355. private sealed class Subscription : IDisposable
  356. {
  357. private readonly ReplayBase _subject;
  358. private readonly IScheduledObserver<T> _observer;
  359. public Subscription(ReplayBase subject, IScheduledObserver<T> observer)
  360. {
  361. _subject = subject;
  362. _observer = observer;
  363. }
  364. public void Dispose()
  365. {
  366. _observer.Dispose();
  367. _subject.Unsubscribe(_observer);
  368. }
  369. }
  370. }
  371. /// <summary>
  372. /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
  373. /// </summary>
  374. private sealed class ReplayByTime : ReplayBase
  375. {
  376. private const int InfiniteBufferSize = int.MaxValue;
  377. private readonly int _bufferSize;
  378. private readonly TimeSpan _window;
  379. private readonly IScheduler _scheduler;
  380. private readonly IStopwatch _stopwatch;
  381. private readonly Queue<TimeInterval<T>> _queue;
  382. public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
  383. {
  384. if (bufferSize < 0)
  385. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  386. if (window < TimeSpan.Zero)
  387. throw new ArgumentOutOfRangeException(nameof(window));
  388. if (scheduler == null)
  389. throw new ArgumentNullException(nameof(scheduler));
  390. _bufferSize = bufferSize;
  391. _window = window;
  392. _scheduler = scheduler;
  393. _stopwatch = _scheduler.StartStopwatch();
  394. _queue = new Queue<TimeInterval<T>>();
  395. }
  396. public ReplayByTime(int bufferSize, TimeSpan window)
  397. : this(bufferSize, window, SchedulerDefaults.Iteration)
  398. {
  399. }
  400. public ReplayByTime(IScheduler scheduler)
  401. : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
  402. {
  403. }
  404. public ReplayByTime(int bufferSize, IScheduler scheduler)
  405. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  406. {
  407. }
  408. public ReplayByTime(TimeSpan window, IScheduler scheduler)
  409. : this(InfiniteBufferSize, window, scheduler)
  410. {
  411. }
  412. public ReplayByTime(TimeSpan window)
  413. : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
  414. {
  415. }
  416. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  417. {
  418. return new ScheduledObserver<T>(_scheduler, observer);
  419. }
  420. protected override void DisposeCore()
  421. {
  422. _queue.Clear();
  423. }
  424. protected override void Next(T value)
  425. {
  426. var now = _stopwatch.Elapsed;
  427. _queue.Enqueue(new TimeInterval<T>(value, now));
  428. }
  429. protected override int Replay(IObserver<T> observer)
  430. {
  431. var n = _queue.Count;
  432. foreach (var item in _queue)
  433. {
  434. observer.OnNext(item.Value);
  435. }
  436. return n;
  437. }
  438. protected override void Trim()
  439. {
  440. var now = _stopwatch.Elapsed;
  441. while (_queue.Count > _bufferSize)
  442. {
  443. _queue.Dequeue();
  444. }
  445. while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
  446. {
  447. _queue.Dequeue();
  448. }
  449. }
  450. }
  451. //
  452. // Below are the non-time based implementations.
  453. // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
  454. // The ReplayOne implementation also removes the need to even have a queue.
  455. //
  456. private sealed class ReplayOne : ReplayBufferBase
  457. {
  458. private bool _hasValue;
  459. private T _value;
  460. protected override void Trim()
  461. {
  462. //
  463. // No need to trim.
  464. //
  465. }
  466. protected override void Next(T value)
  467. {
  468. _hasValue = true;
  469. _value = value;
  470. }
  471. protected override int Replay(IObserver<T> observer)
  472. {
  473. var n = 0;
  474. if (_hasValue)
  475. {
  476. n = 1;
  477. observer.OnNext(_value);
  478. }
  479. return n;
  480. }
  481. protected override void DisposeCore()
  482. {
  483. _value = default(T);
  484. }
  485. }
  486. private sealed class ReplayMany : ReplayManyBase
  487. {
  488. private readonly int _bufferSize;
  489. public ReplayMany(int bufferSize)
  490. : base(bufferSize)
  491. {
  492. _bufferSize = bufferSize;
  493. }
  494. protected override void Trim()
  495. {
  496. while (_queue.Count > _bufferSize)
  497. {
  498. _queue.Dequeue();
  499. }
  500. }
  501. }
  502. private sealed class ReplayAll : ReplayManyBase
  503. {
  504. public ReplayAll()
  505. : base(0)
  506. {
  507. }
  508. protected override void Trim()
  509. {
  510. //
  511. // Don't trim, keep all values.
  512. //
  513. }
  514. }
  515. private abstract class ReplayBufferBase : ReplayBase
  516. {
  517. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  518. {
  519. return new FastImmediateObserver<T>(observer);
  520. }
  521. protected override void DisposeCore()
  522. {
  523. }
  524. }
  525. private abstract class ReplayManyBase : ReplayBufferBase
  526. {
  527. protected readonly Queue<T> _queue;
  528. protected ReplayManyBase(int queueSize)
  529. : base()
  530. {
  531. _queue = new Queue<T>(Math.Min(queueSize, 64));
  532. }
  533. protected override void Next(T value)
  534. {
  535. _queue.Enqueue(value);
  536. }
  537. protected override int Replay(IObserver<T> observer)
  538. {
  539. var n = _queue.Count;
  540. foreach (var item in _queue)
  541. {
  542. observer.OnNext(item);
  543. }
  544. return n;
  545. }
  546. protected override void DisposeCore()
  547. {
  548. _queue.Clear();
  549. }
  550. }
  551. }
  552. /// <summary>
  553. /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
  554. /// </summary>
  555. /// <typeparam name="T">Type of the elements processed by the observer.</typeparam>
  556. internal sealed class FastImmediateObserver<T> : IScheduledObserver<T>
  557. {
  558. /// <summary>
  559. /// Gate to control ownership transfer and protect data structures.
  560. /// </summary>
  561. private readonly object _gate = new object();
  562. /// <summary>
  563. /// Observer to forward notifications to.
  564. /// </summary>
  565. private volatile IObserver<T> _observer;
  566. /// <summary>
  567. /// Queue to enqueue OnNext notifications into.
  568. /// </summary>
  569. private Queue<T> _queue = new Queue<T>();
  570. /// <summary>
  571. /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
  572. /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
  573. /// </summary>
  574. private Queue<T> _queue2;
  575. /// <summary>
  576. /// Exception passed to an OnError notification, if any.
  577. /// </summary>
  578. private Exception _error;
  579. /// <summary>
  580. /// Indicates whether an OnCompleted notification was received.
  581. /// </summary>
  582. private bool _done;
  583. /// <summary>
  584. /// Indicates whether the observer is busy, i.e. some thread is actively draining the
  585. /// notifications that were queued up.
  586. /// </summary>
  587. private bool _busy;
  588. /// <summary>
  589. /// Indicates whether a failure occurred when the owner was draining the queue. This will
  590. /// prevent future work to be processed.
  591. /// </summary>
  592. private bool _hasFaulted;
  593. /// <summary>
  594. /// Creates a new scheduled observer that proxies to the specified observer.
  595. /// </summary>
  596. /// <param name="observer">Observer to forward notifications to.</param>
  597. public FastImmediateObserver(IObserver<T> observer)
  598. {
  599. _observer = observer;
  600. }
  601. /// <summary>
  602. /// Disposes the observer.
  603. /// </summary>
  604. public void Dispose()
  605. {
  606. Done();
  607. }
  608. /// <summary>
  609. /// Notifies the observer of pending work. This will either cause the current owner to
  610. /// process the newly enqueued notifications, or it will cause the calling thread to
  611. /// become the owner and start processing the notification queue.
  612. /// </summary>
  613. public void EnsureActive()
  614. {
  615. EnsureActive(1);
  616. }
  617. /// <summary>
  618. /// Notifies the observer of pending work. This will either cause the current owner to
  619. /// process the newly enqueued notifications, or it will cause the calling thread to
  620. /// become the owner and start processing the notification queue.
  621. /// </summary>
  622. /// <param name="count">The number of enqueued notifications to process (ignored).</param>
  623. public void EnsureActive(int count)
  624. {
  625. var isOwner = false;
  626. lock (_gate)
  627. {
  628. //
  629. // If we failed to process work in the past, we'll simply drop it.
  630. //
  631. if (!_hasFaulted)
  632. {
  633. //
  634. // If no-one is processing the notification queue, become the owner.
  635. //
  636. if (!_busy)
  637. {
  638. isOwner = true;
  639. _busy = true;
  640. }
  641. }
  642. }
  643. if (isOwner)
  644. {
  645. while (true)
  646. {
  647. var queue = default(Queue<T>);
  648. var error = default(Exception);
  649. var done = false;
  650. //
  651. // Steal notifications from the producer side to drain them to the observer.
  652. //
  653. lock (_gate)
  654. {
  655. //
  656. // Do we have any OnNext notifications to process?
  657. //
  658. if (_queue.Count > 0)
  659. {
  660. if (_queue2 == null)
  661. {
  662. _queue2 = new Queue<T>();
  663. }
  664. //
  665. // Swap out the current queue for a fresh or recycled one. The standby
  666. // queue is set to null; when notifications are sent out the processed
  667. // queue will become the new standby.
  668. //
  669. queue = _queue;
  670. _queue = _queue2;
  671. _queue2 = null;
  672. }
  673. //
  674. // Do we have any terminal notifications to process?
  675. //
  676. if (_error != null)
  677. {
  678. error = _error;
  679. }
  680. else if (_done)
  681. {
  682. done = true;
  683. }
  684. else if (queue == null)
  685. {
  686. //
  687. // No work left; quit the loop and let another thread become the
  688. // owner in the future.
  689. //
  690. _busy = false;
  691. break;
  692. }
  693. }
  694. try
  695. {
  696. //
  697. // Process OnNext notifications, if any.
  698. //
  699. if (queue != null)
  700. {
  701. //
  702. // Drain the stolen OnNext notification queue.
  703. //
  704. while (queue.Count > 0)
  705. {
  706. _observer.OnNext(queue.Dequeue());
  707. }
  708. //
  709. // The queue is now empty, so we can reuse it by making it the standby
  710. // queue for a future swap.
  711. //
  712. lock (_gate)
  713. {
  714. _queue2 = queue;
  715. }
  716. }
  717. //
  718. // Process terminal notifications, if any. Notice we don't release ownership
  719. // after processing these notifications; we simply quit from the loop. This
  720. // will cause all processing of the scheduler observer to cease.
  721. //
  722. if (error != null)
  723. {
  724. var observer = Done();
  725. observer.OnError(error);
  726. break;
  727. }
  728. else if (done)
  729. {
  730. var observer = Done();
  731. observer.OnCompleted();
  732. break;
  733. }
  734. }
  735. catch
  736. {
  737. lock (_gate)
  738. {
  739. _hasFaulted = true;
  740. _queue.Clear();
  741. }
  742. throw;
  743. }
  744. }
  745. }
  746. }
  747. /// <summary>
  748. /// Enqueues an OnCompleted notification.
  749. /// </summary>
  750. public void OnCompleted()
  751. {
  752. lock (_gate)
  753. {
  754. if (!_hasFaulted)
  755. {
  756. _done = true;
  757. }
  758. }
  759. }
  760. /// <summary>
  761. /// Enqueues an OnError notification.
  762. /// </summary>
  763. /// <param name="error">Error of the notification.</param>
  764. public void OnError(Exception error)
  765. {
  766. lock (_gate)
  767. {
  768. if (!_hasFaulted)
  769. {
  770. _error = error;
  771. }
  772. }
  773. }
  774. /// <summary>
  775. /// Enqueues an OnNext notification.
  776. /// </summary>
  777. /// <param name="value">Value of the notification.</param>
  778. public void OnNext(T value)
  779. {
  780. lock (_gate)
  781. {
  782. if (!_hasFaulted)
  783. {
  784. _queue.Enqueue(value);
  785. }
  786. }
  787. }
  788. /// <summary>
  789. /// Terminates the observer upon receiving terminal notifications, thus preventing
  790. /// future notifications to go out.
  791. /// </summary>
  792. /// <returns>Observer to send terminal notifications to.</returns>
  793. private IObserver<T> Done()
  794. {
  795. return Interlocked.Exchange(ref _observer, NopObserver<T>.Instance);
  796. }
  797. }
  798. }