1
0

ReplaySubject.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. namespace System.Reactive.Subjects
  5. {
  6. /// <summary>
  7. /// Represents an object that is both an observable sequence as well as an observer.
  8. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
  9. /// </summary>
  10. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  11. public sealed class ReplaySubject<T> : ISubject<T>, IDisposable
  12. {
  13. private const int InfiniteBufferSize = int.MaxValue;
  14. private readonly int _bufferSize;
  15. private readonly TimeSpan _window;
  16. private readonly IScheduler _scheduler;
  17. private readonly IStopwatch _stopwatch;
  18. private readonly Queue<TimeInterval<T>> _queue;
  19. private bool _isStopped;
  20. private Exception _error;
  21. private ImmutableList<ScheduledObserver<T>> _observers;
  22. private bool _isDisposed;
  23. private readonly object _gate = new object();
  24. /// <summary>
  25. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size, window and scheduler.
  26. /// </summary>
  27. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  28. /// <param name="window">Maximum time length of the replay buffer.</param>
  29. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  30. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  31. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  32. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  33. {
  34. if (bufferSize < 0)
  35. throw new ArgumentOutOfRangeException("bufferSize");
  36. if (window < TimeSpan.Zero)
  37. throw new ArgumentOutOfRangeException("window");
  38. if (scheduler == null)
  39. throw new ArgumentNullException("scheduler");
  40. _bufferSize = bufferSize;
  41. _window = window;
  42. _scheduler = scheduler;
  43. _stopwatch = _scheduler.StartStopwatch();
  44. _queue = new Queue<TimeInterval<T>>();
  45. _isStopped = false;
  46. _error = null;
  47. _observers = new ImmutableList<ScheduledObserver<T>>();
  48. }
  49. /// <summary>
  50. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and window.
  51. /// </summary>
  52. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  53. /// <param name="window">Maximum time length of the replay buffer.</param>
  54. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  55. public ReplaySubject(int bufferSize, TimeSpan window)
  56. : this(bufferSize, window, SchedulerDefaults.Iteration)
  57. {
  58. }
  59. /// <summary>
  60. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
  61. /// </summary>
  62. public ReplaySubject()
  63. : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
  64. {
  65. }
  66. /// <summary>
  67. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified scheduler.
  68. /// </summary>
  69. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  70. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  71. public ReplaySubject(IScheduler scheduler)
  72. : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
  73. {
  74. }
  75. /// <summary>
  76. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and scheduler.
  77. /// </summary>
  78. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  79. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  80. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  81. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  82. public ReplaySubject(int bufferSize, IScheduler scheduler)
  83. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  84. {
  85. }
  86. /// <summary>
  87. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size.
  88. /// </summary>
  89. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  90. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  91. public ReplaySubject(int bufferSize)
  92. : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
  93. {
  94. }
  95. /// <summary>
  96. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window and scheduler.
  97. /// </summary>
  98. /// <param name="window">Maximum time length of the replay buffer.</param>
  99. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  100. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  101. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  102. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  103. : this(InfiniteBufferSize, window, scheduler)
  104. {
  105. }
  106. /// <summary>
  107. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window.
  108. /// </summary>
  109. /// <param name="window">Maximum time length of the replay buffer.</param>
  110. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  111. public ReplaySubject(TimeSpan window)
  112. : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
  113. {
  114. }
  115. /// <summary>
  116. /// Indicates whether the subject has observers subscribed to it.
  117. /// </summary>
  118. public bool HasObservers
  119. {
  120. get
  121. {
  122. var observers = _observers;
  123. return observers != null && observers.Data.Length > 0;
  124. }
  125. }
  126. void Trim(TimeSpan now)
  127. {
  128. while (_queue.Count > _bufferSize)
  129. _queue.Dequeue();
  130. while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
  131. _queue.Dequeue();
  132. }
  133. /// <summary>
  134. /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
  135. /// </summary>
  136. /// <param name="value">The value to send to all observers.</param>
  137. public void OnNext(T value)
  138. {
  139. var o = default(ScheduledObserver<T>[]);
  140. lock (_gate)
  141. {
  142. CheckDisposed();
  143. if (!_isStopped)
  144. {
  145. var now = _stopwatch.Elapsed;
  146. _queue.Enqueue(new TimeInterval<T>(value, now));
  147. Trim(now);
  148. o = _observers.Data;
  149. foreach (var observer in o)
  150. observer.OnNext(value);
  151. }
  152. }
  153. if (o != null)
  154. foreach (var observer in o)
  155. observer.EnsureActive();
  156. }
  157. /// <summary>
  158. /// Notifies all subscribed and future observers about the specified exception.
  159. /// </summary>
  160. /// <param name="error">The exception to send to all observers.</param>
  161. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  162. public void OnError(Exception error)
  163. {
  164. if (error == null)
  165. throw new ArgumentNullException("error");
  166. var o = default(ScheduledObserver<T>[]);
  167. lock (_gate)
  168. {
  169. CheckDisposed();
  170. if (!_isStopped)
  171. {
  172. var now = _stopwatch.Elapsed;
  173. _isStopped = true;
  174. _error = error;
  175. Trim(now);
  176. o = _observers.Data;
  177. foreach (var observer in o)
  178. observer.OnError(error);
  179. _observers = new ImmutableList<ScheduledObserver<T>>();
  180. }
  181. }
  182. if (o != null)
  183. foreach (var observer in o)
  184. observer.EnsureActive();
  185. }
  186. /// <summary>
  187. /// Notifies all subscribed and future observers about the end of the sequence.
  188. /// </summary>
  189. public void OnCompleted()
  190. {
  191. var o = default(ScheduledObserver<T>[]);
  192. lock (_gate)
  193. {
  194. CheckDisposed();
  195. if (!_isStopped)
  196. {
  197. var now = _stopwatch.Elapsed;
  198. _isStopped = true;
  199. Trim(now);
  200. o = _observers.Data;
  201. foreach (var observer in o)
  202. observer.OnCompleted();
  203. _observers = new ImmutableList<ScheduledObserver<T>>();
  204. }
  205. }
  206. if (o != null)
  207. foreach (var observer in o)
  208. observer.EnsureActive();
  209. }
  210. /// <summary>
  211. /// Subscribes an observer to the subject.
  212. /// </summary>
  213. /// <param name="observer">Observer to subscribe to the subject.</param>
  214. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  215. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  216. public IDisposable Subscribe(IObserver<T> observer)
  217. {
  218. if (observer == null)
  219. throw new ArgumentNullException("observer");
  220. var so = new ScheduledObserver<T>(_scheduler, observer);
  221. var n = 0;
  222. var subscription = new RemovableDisposable(this, so);
  223. lock (_gate)
  224. {
  225. CheckDisposed();
  226. //
  227. // Notice the v1.x behavior of always calling Trim is preserved here.
  228. //
  229. // This may be subject (pun intended) of debate: should this policy
  230. // only be applied while the sequence is active? With the current
  231. // behavior, a sequence will "die out" after it has terminated by
  232. // continuing to drop OnNext notifications from the queue.
  233. //
  234. // In v1.x, this behavior was due to trimming based on the clock value
  235. // returned by scheduler.Now, applied to all but the terminal message
  236. // in the queue. Using the IStopwatch has the same effect. Either way,
  237. // we guarantee the final notification will be observed, but there's
  238. // no way to retain the buffer directly. One approach is to use the
  239. // time-based TakeLast operator and apply an unbounded ReplaySubject
  240. // to it.
  241. //
  242. // To conclude, we're keeping the behavior as-is for compatibility
  243. // reasons with v1.x.
  244. //
  245. Trim(_stopwatch.Elapsed);
  246. _observers = _observers.Add(so);
  247. n = _queue.Count;
  248. foreach (var item in _queue)
  249. so.OnNext(item.Value);
  250. if (_error != null)
  251. {
  252. n++;
  253. so.OnError(_error);
  254. }
  255. else if (_isStopped)
  256. {
  257. n++;
  258. so.OnCompleted();
  259. }
  260. }
  261. so.EnsureActive(n);
  262. return subscription;
  263. }
  264. void Unsubscribe(ScheduledObserver<T> observer)
  265. {
  266. lock (_gate)
  267. {
  268. if (!_isDisposed)
  269. _observers = _observers.Remove(observer);
  270. }
  271. }
  272. sealed class RemovableDisposable : IDisposable
  273. {
  274. private readonly ReplaySubject<T> _subject;
  275. private readonly ScheduledObserver<T> _observer;
  276. public RemovableDisposable(ReplaySubject<T> subject, ScheduledObserver<T> observer)
  277. {
  278. _subject = subject;
  279. _observer = observer;
  280. }
  281. public void Dispose()
  282. {
  283. _observer.Dispose();
  284. _subject.Unsubscribe(_observer);
  285. }
  286. }
  287. void CheckDisposed()
  288. {
  289. if (_isDisposed)
  290. throw new ObjectDisposedException(string.Empty);
  291. }
  292. /// <summary>
  293. /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
  294. /// </summary>
  295. public void Dispose()
  296. {
  297. lock (_gate)
  298. {
  299. _isDisposed = true;
  300. _observers = null;
  301. }
  302. }
  303. }
  304. }