ReplaySubject.cs 26 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Threading;
  5. namespace System.Reactive.Subjects
  6. {
  7. /// <summary>
  8. /// Represents an object that is both an observable sequence as well as an observer.
  9. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
  10. /// </summary>
  11. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  12. public sealed class ReplaySubject<T> : ISubject<T>, IDisposable
  13. {
  14. private readonly IReplaySubjectImplementation _implementation;
  15. /// <summary>
  16. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size, window and scheduler.
  17. /// </summary>
  18. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  19. /// <param name="window">Maximum time length of the replay buffer.</param>
  20. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  21. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  22. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  23. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  24. {
  25. _implementation = new ReplayByTime(bufferSize, window, scheduler);
  26. }
  27. /// <summary>
  28. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and window.
  29. /// </summary>
  30. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  31. /// <param name="window">Maximum time length of the replay buffer.</param>
  32. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  33. public ReplaySubject(int bufferSize, TimeSpan window)
  34. {
  35. _implementation = new ReplayByTime(bufferSize, window);
  36. }
  37. /// <summary>
  38. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
  39. /// </summary>
  40. public ReplaySubject()
  41. {
  42. _implementation = new ReplayAll();
  43. }
  44. /// <summary>
  45. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified scheduler.
  46. /// </summary>
  47. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  48. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  49. public ReplaySubject(IScheduler scheduler)
  50. {
  51. _implementation = new ReplayByTime(scheduler);
  52. }
  53. //TODO: Does this overload make any sense with the optimisations? Surely this now is just <c>new ReplaySubject<T>(bufferSize).SubscribeOn(scheduler)</c>?
  54. //Potentially should be marked as obsolete
  55. /// <summary>
  56. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and scheduler.
  57. /// </summary>
  58. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  59. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  60. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  61. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  62. public ReplaySubject(int bufferSize, IScheduler scheduler)
  63. {
  64. _implementation = new ReplayByTime(bufferSize, scheduler);
  65. }
  66. /// <summary>
  67. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size.
  68. /// </summary>
  69. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  70. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  71. public ReplaySubject(int bufferSize)
  72. {
  73. switch (bufferSize)
  74. {
  75. case 1:
  76. _implementation = new ReplayOne();
  77. break;
  78. case int.MaxValue:
  79. _implementation = new ReplayAll();
  80. break;
  81. default:
  82. _implementation = new ReplayMany(bufferSize);
  83. break;
  84. }
  85. }
  86. /// <summary>
  87. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window and scheduler.
  88. /// </summary>
  89. /// <param name="window">Maximum time length of the replay buffer.</param>
  90. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  91. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  92. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  93. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  94. {
  95. _implementation = new ReplayByTime(window, scheduler);
  96. }
  97. /// <summary>
  98. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window.
  99. /// </summary>
  100. /// <param name="window">Maximum time length of the replay buffer.</param>
  101. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  102. public ReplaySubject(TimeSpan window)
  103. {
  104. _implementation = new ReplayByTime(window);
  105. }
  106. /// <summary>
  107. /// Indicates whether the subject has observers subscribed to it.
  108. /// </summary>
  109. public bool HasObservers
  110. {
  111. get { return _implementation.HasObservers; }
  112. }
  113. /// <summary>
  114. /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
  115. /// </summary>
  116. /// <param name="value">The value to send to all observers.</param>
  117. public void OnNext(T value)
  118. {
  119. _implementation.OnNext(value);
  120. }
  121. /// <summary>
  122. /// Notifies all subscribed and future observers about the specified exception.
  123. /// </summary>
  124. /// <param name="error">The exception to send to all observers.</param>
  125. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  126. public void OnError(Exception error)
  127. {
  128. _implementation.OnError(error);
  129. }
  130. /// <summary>
  131. /// Notifies all subscribed and future observers about the end of the sequence.
  132. /// </summary>
  133. public void OnCompleted()
  134. {
  135. _implementation.OnCompleted();
  136. }
  137. /// <summary>
  138. /// Subscribes an observer to the subject.
  139. /// </summary>
  140. /// <param name="observer">Observer to subscribe to the subject.</param>
  141. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  142. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  143. public IDisposable Subscribe(IObserver<T> observer)
  144. {
  145. return _implementation.Subscribe(observer);
  146. }
  147. /// <summary>
  148. /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
  149. /// </summary>
  150. public void Dispose()
  151. {
  152. _implementation.Dispose();
  153. }
  154. private interface IReplaySubjectImplementation : ISubject<T>, IDisposable
  155. {
  156. bool HasObservers { get; }
  157. void Unsubscribe(IObserver<T> observer);
  158. }
  159. private class Subscription : IDisposable
  160. {
  161. private IReplaySubjectImplementation _subject;
  162. private IObserver<T> _observer;
  163. public Subscription(IReplaySubjectImplementation subject, IObserver<T> observer)
  164. {
  165. _subject = subject;
  166. _observer = observer;
  167. }
  168. public void Dispose()
  169. {
  170. var observer = Interlocked.Exchange(ref _observer, null);
  171. if (observer == null)
  172. return;
  173. _subject.Unsubscribe(observer);
  174. _subject = null;
  175. }
  176. }
  177. //Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
  178. private sealed class ReplayByTime : IReplaySubjectImplementation
  179. {
  180. private const int InfiniteBufferSize = int.MaxValue;
  181. private readonly int _bufferSize;
  182. private readonly TimeSpan _window;
  183. private readonly IScheduler _scheduler;
  184. private readonly IStopwatch _stopwatch;
  185. private readonly Queue<TimeInterval<T>> _queue;
  186. private bool _isStopped;
  187. private Exception _error;
  188. private ImmutableList<ScheduledObserver<T>> _observers;
  189. private bool _isDisposed;
  190. private readonly object _gate = new object();
  191. public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
  192. {
  193. if (bufferSize < 0)
  194. throw new ArgumentOutOfRangeException("bufferSize");
  195. if (window < TimeSpan.Zero)
  196. throw new ArgumentOutOfRangeException("window");
  197. if (scheduler == null)
  198. throw new ArgumentNullException("scheduler");
  199. _bufferSize = bufferSize;
  200. _window = window;
  201. _scheduler = scheduler;
  202. _stopwatch = _scheduler.StartStopwatch();
  203. _queue = new Queue<TimeInterval<T>>();
  204. _isStopped = false;
  205. _error = null;
  206. _observers = new ImmutableList<ScheduledObserver<T>>();
  207. }
  208. public ReplayByTime(int bufferSize, TimeSpan window)
  209. : this(bufferSize, window, SchedulerDefaults.Iteration)
  210. {
  211. }
  212. public ReplayByTime(IScheduler scheduler)
  213. : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
  214. {
  215. }
  216. public ReplayByTime(int bufferSize, IScheduler scheduler)
  217. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  218. {
  219. }
  220. public ReplayByTime(TimeSpan window, IScheduler scheduler)
  221. : this(InfiniteBufferSize, window, scheduler)
  222. {
  223. }
  224. public ReplayByTime(TimeSpan window)
  225. : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
  226. {
  227. }
  228. public bool HasObservers
  229. {
  230. get
  231. {
  232. var observers = _observers;
  233. return observers != null && observers.Data.Length > 0;
  234. }
  235. }
  236. private void Trim(TimeSpan now)
  237. {
  238. while (_queue.Count > _bufferSize)
  239. _queue.Dequeue();
  240. while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
  241. _queue.Dequeue();
  242. }
  243. public void OnNext(T value)
  244. {
  245. var o = default(ScheduledObserver<T>[]);
  246. lock (_gate)
  247. {
  248. CheckDisposed();
  249. if (!_isStopped)
  250. {
  251. var now = _stopwatch.Elapsed;
  252. _queue.Enqueue(new TimeInterval<T>(value, now));
  253. Trim(now);
  254. o = _observers.Data;
  255. foreach (var observer in o)
  256. observer.OnNext(value);
  257. }
  258. }
  259. if (o != null)
  260. foreach (var observer in o)
  261. observer.EnsureActive();
  262. }
  263. public void OnError(Exception error)
  264. {
  265. if (error == null)
  266. throw new ArgumentNullException("error");
  267. var o = default(ScheduledObserver<T>[]);
  268. lock (_gate)
  269. {
  270. CheckDisposed();
  271. if (!_isStopped)
  272. {
  273. var now = _stopwatch.Elapsed;
  274. _isStopped = true;
  275. _error = error;
  276. Trim(now);
  277. o = _observers.Data;
  278. foreach (var observer in o)
  279. observer.OnError(error);
  280. _observers = new ImmutableList<ScheduledObserver<T>>();
  281. }
  282. }
  283. if (o != null)
  284. foreach (var observer in o)
  285. observer.EnsureActive();
  286. }
  287. public void OnCompleted()
  288. {
  289. var o = default(ScheduledObserver<T>[]);
  290. lock (_gate)
  291. {
  292. CheckDisposed();
  293. if (!_isStopped)
  294. {
  295. var now = _stopwatch.Elapsed;
  296. _isStopped = true;
  297. Trim(now);
  298. o = _observers.Data;
  299. foreach (var observer in o)
  300. observer.OnCompleted();
  301. _observers = new ImmutableList<ScheduledObserver<T>>();
  302. }
  303. }
  304. if (o != null)
  305. foreach (var observer in o)
  306. observer.EnsureActive();
  307. }
  308. public IDisposable Subscribe(IObserver<T> observer)
  309. {
  310. if (observer == null)
  311. throw new ArgumentNullException("observer");
  312. var so = new ScheduledObserver<T>(_scheduler, observer);
  313. var n = 0;
  314. var subscription = new RemovableDisposable(this, so);
  315. lock (_gate)
  316. {
  317. CheckDisposed();
  318. //
  319. // Notice the v1.x behavior of always calling Trim is preserved here.
  320. //
  321. // This may be subject (pun intended) of debate: should this policy
  322. // only be applied while the sequence is active? With the current
  323. // behavior, a sequence will "die out" after it has terminated by
  324. // continuing to drop OnNext notifications from the queue.
  325. //
  326. // In v1.x, this behavior was due to trimming based on the clock value
  327. // returned by scheduler.Now, applied to all but the terminal message
  328. // in the queue. Using the IStopwatch has the same effect. Either way,
  329. // we guarantee the final notification will be observed, but there's
  330. // no way to retain the buffer directly. One approach is to use the
  331. // time-based TakeLast operator and apply an unbounded ReplaySubject
  332. // to it.
  333. //
  334. // To conclude, we're keeping the behavior as-is for compatibility
  335. // reasons with v1.x.
  336. //
  337. Trim(_stopwatch.Elapsed);
  338. _observers = _observers.Add(so);
  339. n = _queue.Count;
  340. foreach (var item in _queue)
  341. so.OnNext(item.Value);
  342. if (_error != null)
  343. {
  344. n++;
  345. so.OnError(_error);
  346. }
  347. else if (_isStopped)
  348. {
  349. n++;
  350. so.OnCompleted();
  351. }
  352. }
  353. so.EnsureActive(n);
  354. return subscription;
  355. }
  356. private void Unsubscribe(ScheduledObserver<T> observer)
  357. {
  358. lock (_gate)
  359. {
  360. observer.Dispose();
  361. if (!_isDisposed)
  362. _observers = _observers.Remove(observer);
  363. }
  364. }
  365. void IReplaySubjectImplementation.Unsubscribe(IObserver<T> observer)
  366. {
  367. var so = (ScheduledObserver<T>)observer;
  368. Unsubscribe(so);
  369. }
  370. sealed class RemovableDisposable : IDisposable
  371. {
  372. private readonly ReplayByTime _subject;
  373. private readonly ScheduledObserver<T> _observer;
  374. public RemovableDisposable(ReplayByTime subject, ScheduledObserver<T> observer)
  375. {
  376. _subject = subject;
  377. _observer = observer;
  378. }
  379. public void Dispose()
  380. {
  381. _observer.Dispose();
  382. _subject.Unsubscribe(_observer);
  383. }
  384. }
  385. private void CheckDisposed()
  386. {
  387. if (_isDisposed)
  388. throw new ObjectDisposedException(string.Empty);
  389. }
  390. public void Dispose()
  391. {
  392. lock (_gate)
  393. {
  394. _isDisposed = true;
  395. _observers = null;
  396. _queue.Clear();
  397. }
  398. }
  399. }
  400. //Below are the non-time based implementations.
  401. //These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
  402. //The ReplayOne implementation also removes the need to even have a queue.
  403. private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
  404. {
  405. private bool _hasValue;
  406. private T _value;
  407. protected override void Trim()
  408. {
  409. //NoOp. No need to trim.
  410. }
  411. protected override void AddValueToBuffer(T value)
  412. {
  413. _hasValue = true;
  414. _value = value;
  415. }
  416. protected override void ReplayBuffer(IObserver<T> observer)
  417. {
  418. if (_hasValue)
  419. observer.OnNext(_value);
  420. }
  421. protected override void Dispose(bool disposing)
  422. {
  423. base.Dispose(disposing);
  424. _value = default(T);
  425. }
  426. }
  427. private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
  428. {
  429. private readonly int _bufferSize;
  430. public ReplayMany(int bufferSize)
  431. : base(bufferSize)
  432. {
  433. _bufferSize = bufferSize;
  434. }
  435. protected override void Trim()
  436. {
  437. while (Queue.Count > _bufferSize)
  438. Queue.Dequeue();
  439. }
  440. }
  441. private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
  442. {
  443. public ReplayAll()
  444. : base(0)
  445. {
  446. }
  447. protected override void Trim()
  448. {
  449. //NoOp; i.e. Dont' trim, keep all values.
  450. }
  451. }
  452. private abstract class ReplayBufferBase : IReplaySubjectImplementation
  453. {
  454. private readonly object _gate = new object();
  455. private bool _isDisposed;
  456. private bool _isStopped;
  457. private Exception _error;
  458. private ImmutableList<IObserver<T>> _observers;
  459. protected ReplayBufferBase()
  460. {
  461. _observers = new ImmutableList<IObserver<T>>();
  462. }
  463. protected abstract void Trim();
  464. protected abstract void AddValueToBuffer(T value);
  465. protected abstract void ReplayBuffer(IObserver<T> observer);
  466. public bool HasObservers
  467. {
  468. get
  469. {
  470. var observers = _observers;
  471. return observers != null && observers.Data.Length > 0;
  472. }
  473. }
  474. public void OnNext(T value)
  475. {
  476. lock (_gate)
  477. {
  478. CheckDisposed();
  479. if (!_isStopped)
  480. {
  481. AddValueToBuffer(value);
  482. Trim();
  483. var o = _observers.Data;
  484. foreach (var observer in o)
  485. observer.OnNext(value);
  486. }
  487. }
  488. }
  489. public void OnError(Exception error)
  490. {
  491. if (error == null)
  492. throw new ArgumentNullException("error");
  493. lock (_gate)
  494. {
  495. CheckDisposed();
  496. if (!_isStopped)
  497. {
  498. _isStopped = true;
  499. _error = error;
  500. Trim();
  501. var o = _observers.Data;
  502. foreach (var observer in o)
  503. observer.OnError(error);
  504. _observers = new ImmutableList<IObserver<T>>();
  505. }
  506. }
  507. }
  508. public void OnCompleted()
  509. {
  510. lock (_gate)
  511. {
  512. CheckDisposed();
  513. if (!_isStopped)
  514. {
  515. _isStopped = true;
  516. Trim();
  517. var o = _observers.Data;
  518. foreach (var observer in o)
  519. observer.OnCompleted();
  520. _observers = new ImmutableList<IObserver<T>>();
  521. }
  522. }
  523. }
  524. public IDisposable Subscribe(IObserver<T> observer)
  525. {
  526. if (observer == null)
  527. throw new ArgumentNullException("observer");
  528. var subscription = new Subscription(this, observer);
  529. lock (_gate)
  530. {
  531. CheckDisposed();
  532. //
  533. // Notice the v1.x behavior of always calling Trim is preserved here.
  534. //
  535. // This may be subject (pun intended) of debate: should this policy
  536. // only be applied while the sequence is active? With the current
  537. // behavior, a sequence will "die out" after it has terminated by
  538. // continuing to drop OnNext notifications from the queue.
  539. //
  540. // In v1.x, this behavior was due to trimming based on the clock value
  541. // returned by scheduler.Now, applied to all but the terminal message
  542. // in the queue. Using the IStopwatch has the same effect. Either way,
  543. // we guarantee the final notification will be observed, but there's
  544. // no way to retain the buffer directly. One approach is to use the
  545. // time-based TakeLast operator and apply an unbounded ReplaySubject
  546. // to it.
  547. //
  548. // To conclude, we're keeping the behavior as-is for compatibility
  549. // reasons with v1.x.
  550. //
  551. _observers = _observers.Add(observer);
  552. ReplayBuffer(observer);
  553. if (_error != null)
  554. {
  555. observer.OnError(_error);
  556. }
  557. else if (_isStopped)
  558. {
  559. observer.OnCompleted();
  560. }
  561. }
  562. return subscription;
  563. }
  564. public void Unsubscribe(IObserver<T> observer)
  565. {
  566. lock (_gate)
  567. {
  568. if (!_isDisposed)
  569. _observers = _observers.Remove(observer);
  570. }
  571. }
  572. private void CheckDisposed()
  573. {
  574. if (_isDisposed)
  575. throw new ObjectDisposedException(string.Empty);
  576. }
  577. public void Dispose()
  578. {
  579. Dispose(true);
  580. }
  581. protected virtual void Dispose(bool disposing)
  582. {
  583. lock (_gate)
  584. {
  585. _isDisposed = true;
  586. _observers = null;
  587. }
  588. }
  589. }
  590. private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
  591. {
  592. private readonly Queue<T> _queue;
  593. protected ReplayManyBase(int queueSize)
  594. : base()
  595. {
  596. _queue = new Queue<T>(queueSize);
  597. }
  598. protected Queue<T> Queue { get { return _queue; } }
  599. protected override void AddValueToBuffer(T value)
  600. {
  601. _queue.Enqueue(value);
  602. }
  603. protected override void ReplayBuffer(IObserver<T> observer)
  604. {
  605. foreach (var item in _queue)
  606. observer.OnNext(item);
  607. }
  608. protected override void Dispose(bool disposing)
  609. {
  610. base.Dispose(disposing);
  611. _queue.Clear();
  612. }
  613. }
  614. }
  615. }