ReplaySubject.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Threading;
  5. namespace System.Reactive.Subjects
  6. {
  7. /// <summary>
  8. /// Represents an object that is both an observable sequence as well as an observer.
  9. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
  10. /// </summary>
  11. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  12. public sealed class ReplaySubject<T> : ISubject<T>, IDisposable
  13. {
  14. #region Fields
  15. /// <summary>
  16. /// Underlying optimized implementation of the replay subject.
  17. /// </summary>
  18. private readonly IReplaySubjectImplementation _implementation;
  19. #endregion
  20. #region Constructors
  21. #region All
  22. /// <summary>
  23. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
  24. /// </summary>
  25. public ReplaySubject()
  26. {
  27. _implementation = new ReplayAll();
  28. }
  29. /// <summary>
  30. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified scheduler.
  31. /// </summary>
  32. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  33. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  34. public ReplaySubject(IScheduler scheduler)
  35. {
  36. _implementation = new ReplayByTime(scheduler);
  37. }
  38. #endregion
  39. #region Count
  40. /// <summary>
  41. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size.
  42. /// </summary>
  43. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  44. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  45. public ReplaySubject(int bufferSize)
  46. {
  47. switch (bufferSize)
  48. {
  49. case 1:
  50. _implementation = new ReplayOne();
  51. break;
  52. case int.MaxValue:
  53. _implementation = new ReplayAll();
  54. break;
  55. default:
  56. _implementation = new ReplayMany(bufferSize);
  57. break;
  58. }
  59. }
  60. /// <summary>
  61. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and scheduler.
  62. /// </summary>
  63. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  64. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  65. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  66. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  67. public ReplaySubject(int bufferSize, IScheduler scheduler)
  68. {
  69. _implementation = new ReplayByTime(bufferSize, scheduler);
  70. }
  71. #endregion
  72. #region Time
  73. /// <summary>
  74. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window.
  75. /// </summary>
  76. /// <param name="window">Maximum time length of the replay buffer.</param>
  77. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  78. public ReplaySubject(TimeSpan window)
  79. {
  80. _implementation = new ReplayByTime(window);
  81. }
  82. /// <summary>
  83. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window and scheduler.
  84. /// </summary>
  85. /// <param name="window">Maximum time length of the replay buffer.</param>
  86. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  87. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  88. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  89. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  90. {
  91. _implementation = new ReplayByTime(window, scheduler);
  92. }
  93. #endregion
  94. #region Count & Time
  95. /// <summary>
  96. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and window.
  97. /// </summary>
  98. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  99. /// <param name="window">Maximum time length of the replay buffer.</param>
  100. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  101. public ReplaySubject(int bufferSize, TimeSpan window)
  102. {
  103. _implementation = new ReplayByTime(bufferSize, window);
  104. }
  105. /// <summary>
  106. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size, window and scheduler.
  107. /// </summary>
  108. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  109. /// <param name="window">Maximum time length of the replay buffer.</param>
  110. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  111. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  112. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  113. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  114. {
  115. _implementation = new ReplayByTime(bufferSize, window, scheduler);
  116. }
  117. #endregion
  118. #endregion
  119. #region Properties
  120. /// <summary>
  121. /// Indicates whether the subject has observers subscribed to it.
  122. /// </summary>
  123. public bool HasObservers
  124. {
  125. get { return _implementation.HasObservers; }
  126. }
  127. #endregion
  128. #region Methods
  129. #region Observer implementation
  130. /// <summary>
  131. /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
  132. /// </summary>
  133. /// <param name="value">The value to send to all observers.</param>
  134. public void OnNext(T value)
  135. {
  136. _implementation.OnNext(value);
  137. }
  138. /// <summary>
  139. /// Notifies all subscribed and future observers about the specified exception.
  140. /// </summary>
  141. /// <param name="error">The exception to send to all observers.</param>
  142. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  143. public void OnError(Exception error)
  144. {
  145. _implementation.OnError(error);
  146. }
  147. /// <summary>
  148. /// Notifies all subscribed and future observers about the end of the sequence.
  149. /// </summary>
  150. public void OnCompleted()
  151. {
  152. _implementation.OnCompleted();
  153. }
  154. #endregion
  155. #region IObservable implementation
  156. /// <summary>
  157. /// Subscribes an observer to the subject.
  158. /// </summary>
  159. /// <param name="observer">Observer to subscribe to the subject.</param>
  160. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  161. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  162. public IDisposable Subscribe(IObserver<T> observer)
  163. {
  164. return _implementation.Subscribe(observer);
  165. }
  166. #endregion
  167. #region IDisposable implementation
  168. /// <summary>
  169. /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
  170. /// </summary>
  171. public void Dispose()
  172. {
  173. _implementation.Dispose();
  174. }
  175. #endregion
  176. #endregion
  177. private interface IReplaySubjectImplementation : ISubject<T>, IDisposable
  178. {
  179. bool HasObservers { get; }
  180. void Unsubscribe(IObserver<T> observer);
  181. }
  182. private class Subscription : IDisposable
  183. {
  184. private IReplaySubjectImplementation _subject;
  185. private IObserver<T> _observer;
  186. public Subscription(IReplaySubjectImplementation subject, IObserver<T> observer)
  187. {
  188. _subject = subject;
  189. _observer = observer;
  190. }
  191. public void Dispose()
  192. {
  193. var observer = Interlocked.Exchange(ref _observer, null);
  194. if (observer == null)
  195. return;
  196. _subject.Unsubscribe(observer);
  197. _subject = null;
  198. }
  199. }
  200. /// <summary>
  201. /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
  202. /// </summary>
  203. private sealed class ReplayByTime : IReplaySubjectImplementation
  204. {
  205. private const int InfiniteBufferSize = int.MaxValue;
  206. private readonly int _bufferSize;
  207. private readonly TimeSpan _window;
  208. private readonly IScheduler _scheduler;
  209. private readonly IStopwatch _stopwatch;
  210. private readonly Queue<TimeInterval<T>> _queue;
  211. private bool _isStopped;
  212. private Exception _error;
  213. private ImmutableList<IScheduledObserver<T>> _observers;
  214. private bool _isDisposed;
  215. private readonly object _gate = new object();
  216. public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
  217. {
  218. if (bufferSize < 0)
  219. throw new ArgumentOutOfRangeException("bufferSize");
  220. if (window < TimeSpan.Zero)
  221. throw new ArgumentOutOfRangeException("window");
  222. if (scheduler == null)
  223. throw new ArgumentNullException("scheduler");
  224. _bufferSize = bufferSize;
  225. _window = window;
  226. _scheduler = scheduler;
  227. _stopwatch = _scheduler.StartStopwatch();
  228. _queue = new Queue<TimeInterval<T>>();
  229. _isStopped = false;
  230. _error = null;
  231. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  232. }
  233. public ReplayByTime(int bufferSize, TimeSpan window)
  234. : this(bufferSize, window, SchedulerDefaults.Iteration)
  235. {
  236. }
  237. public ReplayByTime(IScheduler scheduler)
  238. : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
  239. {
  240. }
  241. public ReplayByTime(int bufferSize, IScheduler scheduler)
  242. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  243. {
  244. }
  245. public ReplayByTime(TimeSpan window, IScheduler scheduler)
  246. : this(InfiniteBufferSize, window, scheduler)
  247. {
  248. }
  249. public ReplayByTime(TimeSpan window)
  250. : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
  251. {
  252. }
  253. public bool HasObservers
  254. {
  255. get
  256. {
  257. var observers = _observers;
  258. return observers != null && observers.Data.Length > 0;
  259. }
  260. }
  261. private void Trim(TimeSpan now)
  262. {
  263. while (_queue.Count > _bufferSize)
  264. _queue.Dequeue();
  265. while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
  266. _queue.Dequeue();
  267. }
  268. public void OnNext(T value)
  269. {
  270. var o = default(IScheduledObserver<T>[]);
  271. lock (_gate)
  272. {
  273. CheckDisposed();
  274. if (!_isStopped)
  275. {
  276. var now = _stopwatch.Elapsed;
  277. _queue.Enqueue(new TimeInterval<T>(value, now));
  278. Trim(now);
  279. o = _observers.Data;
  280. foreach (var observer in o)
  281. observer.OnNext(value);
  282. }
  283. }
  284. if (o != null)
  285. foreach (var observer in o)
  286. observer.EnsureActive();
  287. }
  288. public void OnError(Exception error)
  289. {
  290. if (error == null)
  291. throw new ArgumentNullException("error");
  292. var o = default(IScheduledObserver<T>[]);
  293. lock (_gate)
  294. {
  295. CheckDisposed();
  296. if (!_isStopped)
  297. {
  298. var now = _stopwatch.Elapsed;
  299. _isStopped = true;
  300. _error = error;
  301. Trim(now);
  302. o = _observers.Data;
  303. foreach (var observer in o)
  304. observer.OnError(error);
  305. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  306. }
  307. }
  308. if (o != null)
  309. foreach (var observer in o)
  310. observer.EnsureActive();
  311. }
  312. public void OnCompleted()
  313. {
  314. var o = default(IScheduledObserver<T>[]);
  315. lock (_gate)
  316. {
  317. CheckDisposed();
  318. if (!_isStopped)
  319. {
  320. var now = _stopwatch.Elapsed;
  321. _isStopped = true;
  322. Trim(now);
  323. o = _observers.Data;
  324. foreach (var observer in o)
  325. observer.OnCompleted();
  326. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  327. }
  328. }
  329. if (o != null)
  330. foreach (var observer in o)
  331. observer.EnsureActive();
  332. }
  333. public IDisposable Subscribe(IObserver<T> observer)
  334. {
  335. if (observer == null)
  336. throw new ArgumentNullException("observer");
  337. var so = new ScheduledObserver<T>(_scheduler, observer);
  338. var n = 0;
  339. var subscription = new RemovableDisposable(this, so);
  340. lock (_gate)
  341. {
  342. CheckDisposed();
  343. //
  344. // Notice the v1.x behavior of always calling Trim is preserved here.
  345. //
  346. // This may be subject (pun intended) of debate: should this policy
  347. // only be applied while the sequence is active? With the current
  348. // behavior, a sequence will "die out" after it has terminated by
  349. // continuing to drop OnNext notifications from the queue.
  350. //
  351. // In v1.x, this behavior was due to trimming based on the clock value
  352. // returned by scheduler.Now, applied to all but the terminal message
  353. // in the queue. Using the IStopwatch has the same effect. Either way,
  354. // we guarantee the final notification will be observed, but there's
  355. // no way to retain the buffer directly. One approach is to use the
  356. // time-based TakeLast operator and apply an unbounded ReplaySubject
  357. // to it.
  358. //
  359. // To conclude, we're keeping the behavior as-is for compatibility
  360. // reasons with v1.x.
  361. //
  362. Trim(_stopwatch.Elapsed);
  363. _observers = _observers.Add(so);
  364. n = _queue.Count;
  365. foreach (var item in _queue)
  366. so.OnNext(item.Value);
  367. if (_error != null)
  368. {
  369. n++;
  370. so.OnError(_error);
  371. }
  372. else if (_isStopped)
  373. {
  374. n++;
  375. so.OnCompleted();
  376. }
  377. }
  378. so.EnsureActive(n);
  379. return subscription;
  380. }
  381. private void Unsubscribe(IScheduledObserver<T> observer)
  382. {
  383. lock (_gate)
  384. {
  385. observer.Dispose();
  386. if (!_isDisposed)
  387. {
  388. _observers = _observers.Remove(observer);
  389. }
  390. }
  391. }
  392. void IReplaySubjectImplementation.Unsubscribe(IObserver<T> observer)
  393. {
  394. var so = (IScheduledObserver<T>)observer;
  395. Unsubscribe(so);
  396. }
  397. sealed class RemovableDisposable : IDisposable
  398. {
  399. private readonly ReplayByTime _subject;
  400. private readonly IScheduledObserver<T> _observer;
  401. public RemovableDisposable(ReplayByTime subject, IScheduledObserver<T> observer)
  402. {
  403. _subject = subject;
  404. _observer = observer;
  405. }
  406. public void Dispose()
  407. {
  408. _observer.Dispose();
  409. _subject.Unsubscribe(_observer);
  410. }
  411. }
  412. private void CheckDisposed()
  413. {
  414. if (_isDisposed)
  415. throw new ObjectDisposedException(string.Empty);
  416. }
  417. public void Dispose()
  418. {
  419. lock (_gate)
  420. {
  421. _isDisposed = true;
  422. _observers = null;
  423. _queue.Clear();
  424. }
  425. }
  426. }
  427. //
  428. // Below are the non-time based implementations.
  429. // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
  430. // The ReplayOne implementation also removes the need to even have a queue.
  431. //
  432. private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
  433. {
  434. private bool _hasValue;
  435. private T _value;
  436. protected override void Trim()
  437. {
  438. //
  439. // No need to trim.
  440. //
  441. }
  442. protected override void AddValueToBuffer(T value)
  443. {
  444. _hasValue = true;
  445. _value = value;
  446. }
  447. protected override void ReplayBuffer(IObserver<T> observer)
  448. {
  449. if (_hasValue)
  450. observer.OnNext(_value);
  451. }
  452. protected override void Dispose(bool disposing)
  453. {
  454. base.Dispose(disposing);
  455. _value = default(T);
  456. }
  457. }
  458. private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
  459. {
  460. private readonly int _bufferSize;
  461. public ReplayMany(int bufferSize)
  462. : base(bufferSize)
  463. {
  464. _bufferSize = bufferSize;
  465. }
  466. protected override void Trim()
  467. {
  468. while (Queue.Count > _bufferSize)
  469. Queue.Dequeue();
  470. }
  471. }
  472. private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
  473. {
  474. public ReplayAll()
  475. : base(0)
  476. {
  477. }
  478. protected override void Trim()
  479. {
  480. //
  481. // Don't trim, keep all values.
  482. //
  483. }
  484. }
  485. private abstract class ReplayBufferBase : IReplaySubjectImplementation
  486. {
  487. private readonly object _gate = new object();
  488. private bool _isDisposed;
  489. private bool _isStopped;
  490. private Exception _error;
  491. private ImmutableList<IObserver<T>> _observers;
  492. protected ReplayBufferBase()
  493. {
  494. _observers = ImmutableList<IObserver<T>>.Empty;
  495. }
  496. protected abstract void Trim();
  497. protected abstract void AddValueToBuffer(T value);
  498. protected abstract void ReplayBuffer(IObserver<T> observer);
  499. public bool HasObservers
  500. {
  501. get
  502. {
  503. var observers = _observers;
  504. return observers != null && observers.Data.Length > 0;
  505. }
  506. }
  507. public void OnNext(T value)
  508. {
  509. lock (_gate)
  510. {
  511. CheckDisposed();
  512. if (!_isStopped)
  513. {
  514. AddValueToBuffer(value);
  515. Trim();
  516. var o = _observers.Data;
  517. foreach (var observer in o)
  518. observer.OnNext(value);
  519. }
  520. }
  521. }
  522. public void OnError(Exception error)
  523. {
  524. if (error == null)
  525. throw new ArgumentNullException("error");
  526. lock (_gate)
  527. {
  528. CheckDisposed();
  529. if (!_isStopped)
  530. {
  531. _isStopped = true;
  532. _error = error;
  533. Trim();
  534. var o = _observers.Data;
  535. foreach (var observer in o)
  536. observer.OnError(error);
  537. _observers = ImmutableList<IObserver<T>>.Empty;
  538. }
  539. }
  540. }
  541. public void OnCompleted()
  542. {
  543. lock (_gate)
  544. {
  545. CheckDisposed();
  546. if (!_isStopped)
  547. {
  548. _isStopped = true;
  549. Trim();
  550. var o = _observers.Data;
  551. foreach (var observer in o)
  552. observer.OnCompleted();
  553. _observers = ImmutableList<IObserver<T>>.Empty;
  554. }
  555. }
  556. }
  557. public IDisposable Subscribe(IObserver<T> observer)
  558. {
  559. if (observer == null)
  560. throw new ArgumentNullException("observer");
  561. var subscription = new Subscription(this, observer);
  562. lock (_gate)
  563. {
  564. CheckDisposed();
  565. //
  566. // Notice the v1.x behavior of always calling Trim is preserved here.
  567. //
  568. // This may be subject (pun intended) of debate: should this policy
  569. // only be applied while the sequence is active? With the current
  570. // behavior, a sequence will "die out" after it has terminated by
  571. // continuing to drop OnNext notifications from the queue.
  572. //
  573. // In v1.x, this behavior was due to trimming based on the clock value
  574. // returned by scheduler.Now, applied to all but the terminal message
  575. // in the queue. Using the IStopwatch has the same effect. Either way,
  576. // we guarantee the final notification will be observed, but there's
  577. // no way to retain the buffer directly. One approach is to use the
  578. // time-based TakeLast operator and apply an unbounded ReplaySubject
  579. // to it.
  580. //
  581. // To conclude, we're keeping the behavior as-is for compatibility
  582. // reasons with v1.x.
  583. //
  584. Trim();
  585. _observers = _observers.Add(observer);
  586. ReplayBuffer(observer);
  587. if (_error != null)
  588. {
  589. observer.OnError(_error);
  590. }
  591. else if (_isStopped)
  592. {
  593. observer.OnCompleted();
  594. }
  595. }
  596. return subscription;
  597. }
  598. public void Unsubscribe(IObserver<T> observer)
  599. {
  600. lock (_gate)
  601. {
  602. if (!_isDisposed)
  603. {
  604. _observers = _observers.Remove(observer);
  605. }
  606. }
  607. }
  608. private void CheckDisposed()
  609. {
  610. if (_isDisposed)
  611. throw new ObjectDisposedException(string.Empty);
  612. }
  613. public void Dispose()
  614. {
  615. Dispose(true);
  616. }
  617. protected virtual void Dispose(bool disposing)
  618. {
  619. lock (_gate)
  620. {
  621. _isDisposed = true;
  622. _observers = null;
  623. }
  624. }
  625. }
  626. private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
  627. {
  628. private readonly Queue<T> _queue;
  629. protected ReplayManyBase(int queueSize)
  630. : base()
  631. {
  632. _queue = new Queue<T>(queueSize);
  633. }
  634. protected Queue<T> Queue
  635. {
  636. get
  637. {
  638. return _queue;
  639. }
  640. }
  641. protected override void AddValueToBuffer(T value)
  642. {
  643. _queue.Enqueue(value);
  644. }
  645. protected override void ReplayBuffer(IObserver<T> observer)
  646. {
  647. foreach (var item in _queue)
  648. observer.OnNext(item);
  649. }
  650. protected override void Dispose(bool disposing)
  651. {
  652. base.Dispose(disposing);
  653. _queue.Clear();
  654. }
  655. }
  656. }
  657. }