ReplaySubject.cs 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. namespace System.Reactive.Subjects
  7. {
  8. /// <summary>
  9. /// Represents an object that is both an observable sequence as well as an observer.
  10. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
  11. /// </summary>
  12. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  13. public sealed class ReplaySubject<T> : SubjectBase<T>, IDisposable
  14. {
  15. #region Fields
  16. /// <summary>
  17. /// Underlying optimized implementation of the replay subject.
  18. /// </summary>
  19. private readonly SubjectBase<T> _implementation;
  20. #endregion
  21. #region Constructors
  22. #region All
  23. /// <summary>
  24. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
  25. /// </summary>
  26. public ReplaySubject()
  27. : this(int.MaxValue)
  28. {
  29. }
  30. /// <summary>
  31. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified scheduler.
  32. /// </summary>
  33. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  34. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  35. public ReplaySubject(IScheduler scheduler)
  36. {
  37. _implementation = new ReplayByTime(scheduler);
  38. }
  39. #endregion
  40. #region Count
  41. /// <summary>
  42. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size.
  43. /// </summary>
  44. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  45. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  46. public ReplaySubject(int bufferSize)
  47. {
  48. switch (bufferSize)
  49. {
  50. case 1:
  51. _implementation = new ReplayOne();
  52. break;
  53. case int.MaxValue:
  54. _implementation = new ReplayAll();
  55. break;
  56. default:
  57. _implementation = new ReplayMany(bufferSize);
  58. break;
  59. }
  60. }
  61. /// <summary>
  62. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and scheduler.
  63. /// </summary>
  64. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  65. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  66. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  67. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  68. public ReplaySubject(int bufferSize, IScheduler scheduler)
  69. {
  70. _implementation = new ReplayByTime(bufferSize, scheduler);
  71. }
  72. #endregion
  73. #region Time
  74. /// <summary>
  75. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window.
  76. /// </summary>
  77. /// <param name="window">Maximum time length of the replay buffer.</param>
  78. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  79. public ReplaySubject(TimeSpan window)
  80. {
  81. _implementation = new ReplayByTime(window);
  82. }
  83. /// <summary>
  84. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window and scheduler.
  85. /// </summary>
  86. /// <param name="window">Maximum time length of the replay buffer.</param>
  87. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  88. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  89. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  90. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  91. {
  92. _implementation = new ReplayByTime(window, scheduler);
  93. }
  94. #endregion
  95. #region Count & Time
  96. /// <summary>
  97. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and window.
  98. /// </summary>
  99. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  100. /// <param name="window">Maximum time length of the replay buffer.</param>
  101. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  102. public ReplaySubject(int bufferSize, TimeSpan window)
  103. {
  104. _implementation = new ReplayByTime(bufferSize, window);
  105. }
  106. /// <summary>
  107. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size, window and scheduler.
  108. /// </summary>
  109. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  110. /// <param name="window">Maximum time length of the replay buffer.</param>
  111. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  112. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  113. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  114. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  115. {
  116. _implementation = new ReplayByTime(bufferSize, window, scheduler);
  117. }
  118. #endregion
  119. #endregion
  120. #region Properties
  121. /// <summary>
  122. /// Indicates whether the subject has observers subscribed to it.
  123. /// </summary>
  124. public override bool HasObservers
  125. {
  126. get { return _implementation.HasObservers; }
  127. }
  128. /// <summary>
  129. /// Indicates whether the subject has been disposed.
  130. /// </summary>
  131. public override bool IsDisposed
  132. {
  133. get { return _implementation.IsDisposed; }
  134. }
  135. #endregion
  136. #region Methods
  137. #region IObserver<T> implementation
  138. /// <summary>
  139. /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
  140. /// </summary>
  141. /// <param name="value">The value to send to all observers.</param>
  142. public override void OnNext(T value)
  143. {
  144. _implementation.OnNext(value);
  145. }
  146. /// <summary>
  147. /// Notifies all subscribed and future observers about the specified exception.
  148. /// </summary>
  149. /// <param name="error">The exception to send to all observers.</param>
  150. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  151. public override void OnError(Exception error)
  152. {
  153. if (error == null)
  154. throw new ArgumentNullException("error");
  155. _implementation.OnError(error);
  156. }
  157. /// <summary>
  158. /// Notifies all subscribed and future observers about the end of the sequence.
  159. /// </summary>
  160. public override void OnCompleted()
  161. {
  162. _implementation.OnCompleted();
  163. }
  164. #endregion
  165. #region IObservable<T> implementation
  166. /// <summary>
  167. /// Subscribes an observer to the subject.
  168. /// </summary>
  169. /// <param name="observer">Observer to subscribe to the subject.</param>
  170. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  171. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  172. public override IDisposable Subscribe(IObserver<T> observer)
  173. {
  174. if (observer == null)
  175. throw new ArgumentNullException("observer");
  176. return _implementation.Subscribe(observer);
  177. }
  178. #endregion
  179. #region IDisposable implementation
  180. /// <summary>
  181. /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
  182. /// </summary>
  183. public override void Dispose()
  184. {
  185. _implementation.Dispose();
  186. }
  187. #endregion
  188. #endregion
  189. private abstract class ReplayBase : SubjectBase<T>
  190. {
  191. private readonly object _gate = new object();
  192. private ImmutableList<IScheduledObserver<T>> _observers;
  193. private bool _isStopped;
  194. private Exception _error;
  195. private bool _isDisposed;
  196. public ReplayBase()
  197. {
  198. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  199. _isStopped = false;
  200. _error = null;
  201. }
  202. public override bool HasObservers
  203. {
  204. get
  205. {
  206. var observers = _observers;
  207. return observers != null && observers.Data.Length > 0;
  208. }
  209. }
  210. public override bool IsDisposed
  211. {
  212. get
  213. {
  214. lock (_gate)
  215. {
  216. return _isDisposed;
  217. }
  218. }
  219. }
  220. public override void OnNext(T value)
  221. {
  222. var o = default(IScheduledObserver<T>[]);
  223. lock (_gate)
  224. {
  225. CheckDisposed();
  226. if (!_isStopped)
  227. {
  228. Next(value);
  229. Trim();
  230. o = _observers.Data;
  231. foreach (var observer in o)
  232. observer.OnNext(value);
  233. }
  234. }
  235. if (o != null)
  236. {
  237. foreach (var observer in o)
  238. observer.EnsureActive();
  239. }
  240. }
  241. public override void OnError(Exception error)
  242. {
  243. var o = default(IScheduledObserver<T>[]);
  244. lock (_gate)
  245. {
  246. CheckDisposed();
  247. if (!_isStopped)
  248. {
  249. _isStopped = true;
  250. _error = error;
  251. Trim();
  252. o = _observers.Data;
  253. foreach (var observer in o)
  254. observer.OnError(error);
  255. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  256. }
  257. }
  258. if (o != null)
  259. {
  260. foreach (var observer in o)
  261. observer.EnsureActive();
  262. }
  263. }
  264. public override void OnCompleted()
  265. {
  266. var o = default(IScheduledObserver<T>[]);
  267. lock (_gate)
  268. {
  269. CheckDisposed();
  270. if (!_isStopped)
  271. {
  272. _isStopped = true;
  273. Trim();
  274. o = _observers.Data;
  275. foreach (var observer in o)
  276. observer.OnCompleted();
  277. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  278. }
  279. }
  280. if (o != null)
  281. {
  282. foreach (var observer in o)
  283. observer.EnsureActive();
  284. }
  285. }
  286. public override IDisposable Subscribe(IObserver<T> observer)
  287. {
  288. var so = CreateScheduledObserver(observer);
  289. var n = 0;
  290. var subscription = Disposable.Empty;
  291. lock (_gate)
  292. {
  293. CheckDisposed();
  294. //
  295. // Notice the v1.x behavior of always calling Trim is preserved here.
  296. //
  297. // This may be subject (pun intended) of debate: should this policy
  298. // only be applied while the sequence is active? With the current
  299. // behavior, a sequence will "die out" after it has terminated by
  300. // continuing to drop OnNext notifications from the queue.
  301. //
  302. // In v1.x, this behavior was due to trimming based on the clock value
  303. // returned by scheduler.Now, applied to all but the terminal message
  304. // in the queue. Using the IStopwatch has the same effect. Either way,
  305. // we guarantee the final notification will be observed, but there's
  306. // no way to retain the buffer directly. One approach is to use the
  307. // time-based TakeLast operator and apply an unbounded ReplaySubject
  308. // to it.
  309. //
  310. // To conclude, we're keeping the behavior as-is for compatibility
  311. // reasons with v1.x.
  312. //
  313. Trim();
  314. n = Replay(so);
  315. if (_error != null)
  316. {
  317. n++;
  318. so.OnError(_error);
  319. }
  320. else if (_isStopped)
  321. {
  322. n++;
  323. so.OnCompleted();
  324. }
  325. if (!_isStopped)
  326. {
  327. subscription = new Subscription(this, so);
  328. _observers = _observers.Add(so);
  329. }
  330. }
  331. so.EnsureActive(n);
  332. return subscription;
  333. }
  334. public override void Dispose()
  335. {
  336. lock (_gate)
  337. {
  338. _isDisposed = true;
  339. _observers = null;
  340. DisposeCore();
  341. }
  342. }
  343. protected abstract void DisposeCore();
  344. protected abstract void Next(T value);
  345. protected abstract int Replay(IObserver<T> observer);
  346. protected abstract void Trim();
  347. protected abstract IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer);
  348. private void CheckDisposed()
  349. {
  350. if (_isDisposed)
  351. throw new ObjectDisposedException(string.Empty);
  352. }
  353. private void Unsubscribe(IScheduledObserver<T> observer)
  354. {
  355. lock (_gate)
  356. {
  357. if (!_isDisposed)
  358. {
  359. _observers = _observers.Remove(observer);
  360. }
  361. }
  362. }
  363. #if NOTYET // TODO: Expose internal notifications similar to BehaviorSubject<T>.TryGetValue?
  364. public bool TryGetNotifications(out IList<Notification<T>> notifications)
  365. {
  366. lock (_gate)
  367. {
  368. if (_isDisposed)
  369. {
  370. notifications = null;
  371. return false;
  372. }
  373. else
  374. {
  375. var res = new List<Notification<T>>();
  376. var materializer = Observer.Create<T>(
  377. x => res.Add(Notification.CreateOnNext(x)),
  378. ex => res.Add(Notification.CreateOnError<T>(ex)),
  379. () => res.Add(Notification.CreateOnCompleted<T>())
  380. );
  381. Replay(materializer);
  382. if (_error != null)
  383. {
  384. materializer.OnError(_error);
  385. }
  386. else if (_isStopped)
  387. {
  388. materializer.OnCompleted();
  389. }
  390. notifications = res;
  391. return true;
  392. }
  393. }
  394. }
  395. #endif
  396. private sealed class Subscription : IDisposable
  397. {
  398. private readonly ReplayBase _subject;
  399. private readonly IScheduledObserver<T> _observer;
  400. public Subscription(ReplayBase subject, IScheduledObserver<T> observer)
  401. {
  402. _subject = subject;
  403. _observer = observer;
  404. }
  405. public void Dispose()
  406. {
  407. _observer.Dispose();
  408. _subject.Unsubscribe(_observer);
  409. }
  410. }
  411. }
  412. /// <summary>
  413. /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
  414. /// </summary>
  415. private sealed class ReplayByTime : ReplayBase
  416. {
  417. private const int InfiniteBufferSize = int.MaxValue;
  418. private readonly int _bufferSize;
  419. private readonly TimeSpan _window;
  420. private readonly IScheduler _scheduler;
  421. private readonly IStopwatch _stopwatch;
  422. private readonly Queue<TimeInterval<T>> _queue;
  423. public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
  424. {
  425. if (bufferSize < 0)
  426. throw new ArgumentOutOfRangeException("bufferSize");
  427. if (window < TimeSpan.Zero)
  428. throw new ArgumentOutOfRangeException("window");
  429. if (scheduler == null)
  430. throw new ArgumentNullException("scheduler");
  431. _bufferSize = bufferSize;
  432. _window = window;
  433. _scheduler = scheduler;
  434. _stopwatch = _scheduler.StartStopwatch();
  435. _queue = new Queue<TimeInterval<T>>();
  436. }
  437. public ReplayByTime(int bufferSize, TimeSpan window)
  438. : this(bufferSize, window, SchedulerDefaults.Iteration)
  439. {
  440. }
  441. public ReplayByTime(IScheduler scheduler)
  442. : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
  443. {
  444. }
  445. public ReplayByTime(int bufferSize, IScheduler scheduler)
  446. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  447. {
  448. }
  449. public ReplayByTime(TimeSpan window, IScheduler scheduler)
  450. : this(InfiniteBufferSize, window, scheduler)
  451. {
  452. }
  453. public ReplayByTime(TimeSpan window)
  454. : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
  455. {
  456. }
  457. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  458. {
  459. return new ScheduledObserver<T>(_scheduler, observer);
  460. }
  461. protected override void DisposeCore()
  462. {
  463. _queue.Clear();
  464. }
  465. protected override void Next(T value)
  466. {
  467. var now = _stopwatch.Elapsed;
  468. _queue.Enqueue(new TimeInterval<T>(value, now));
  469. }
  470. protected override int Replay(IObserver<T> observer)
  471. {
  472. var n = _queue.Count;
  473. foreach (var item in _queue)
  474. observer.OnNext(item.Value);
  475. return n;
  476. }
  477. protected override void Trim()
  478. {
  479. var now = _stopwatch.Elapsed;
  480. while (_queue.Count > _bufferSize)
  481. _queue.Dequeue();
  482. while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
  483. _queue.Dequeue();
  484. }
  485. }
  486. //
  487. // Below are the non-time based implementations.
  488. // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
  489. // The ReplayOne implementation also removes the need to even have a queue.
  490. //
  491. private sealed class ReplayOne : ReplayBufferBase
  492. {
  493. private bool _hasValue;
  494. private T _value;
  495. protected override void Trim()
  496. {
  497. //
  498. // No need to trim.
  499. //
  500. }
  501. protected override void Next(T value)
  502. {
  503. _hasValue = true;
  504. _value = value;
  505. }
  506. protected override int Replay(IObserver<T> observer)
  507. {
  508. var n = 0;
  509. if (_hasValue)
  510. {
  511. n = 1;
  512. observer.OnNext(_value);
  513. }
  514. return n;
  515. }
  516. protected override void DisposeCore()
  517. {
  518. _value = default(T);
  519. }
  520. }
  521. private sealed class ReplayMany : ReplayManyBase
  522. {
  523. private readonly int _bufferSize;
  524. public ReplayMany(int bufferSize)
  525. : base(bufferSize)
  526. {
  527. _bufferSize = bufferSize;
  528. }
  529. protected override void Trim()
  530. {
  531. while (_queue.Count > _bufferSize)
  532. _queue.Dequeue();
  533. }
  534. }
  535. private sealed class ReplayAll : ReplayManyBase
  536. {
  537. public ReplayAll()
  538. : base(0)
  539. {
  540. }
  541. protected override void Trim()
  542. {
  543. //
  544. // Don't trim, keep all values.
  545. //
  546. }
  547. }
  548. private abstract class ReplayBufferBase : ReplayBase
  549. {
  550. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  551. {
  552. return new FastImmediateObserver<T>(observer);
  553. }
  554. protected override void DisposeCore()
  555. {
  556. }
  557. }
  558. private abstract class ReplayManyBase : ReplayBufferBase
  559. {
  560. protected readonly Queue<T> _queue;
  561. protected ReplayManyBase(int queueSize)
  562. : base()
  563. {
  564. _queue = new Queue<T>(Math.Min(queueSize, 64));
  565. }
  566. protected override void Next(T value)
  567. {
  568. _queue.Enqueue(value);
  569. }
  570. protected override int Replay(IObserver<T> observer)
  571. {
  572. var n = _queue.Count;
  573. foreach (var item in _queue)
  574. observer.OnNext(item);
  575. return n;
  576. }
  577. protected override void DisposeCore()
  578. {
  579. _queue.Clear();
  580. }
  581. }
  582. }
  583. /// <summary>
  584. /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
  585. /// </summary>
  586. /// <typeparam name="T">Type of the elements processed by the observer.</typeparam>
  587. class FastImmediateObserver<T> : IScheduledObserver<T>
  588. {
  589. /// <summary>
  590. /// Gate to control ownership transfer and protect data structures.
  591. /// </summary>
  592. private readonly object _gate = new object();
  593. /// <summary>
  594. /// Observer to forward notifications to.
  595. /// </summary>
  596. private volatile IObserver<T> _observer;
  597. /// <summary>
  598. /// Queue to enqueue OnNext notifications into.
  599. /// </summary>
  600. private Queue<T> _queue = new Queue<T>();
  601. /// <summary>
  602. /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
  603. /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
  604. /// </summary>
  605. private Queue<T> _queue2;
  606. /// <summary>
  607. /// Exception passed to an OnError notification, if any.
  608. /// </summary>
  609. private Exception _error;
  610. /// <summary>
  611. /// Indicates whether an OnCompleted notification was received.
  612. /// </summary>
  613. private bool _done;
  614. /// <summary>
  615. /// Indicates whether the observer is busy, i.e. some thread is actively draining the
  616. /// notifications that were queued up.
  617. /// </summary>
  618. private bool _busy;
  619. /// <summary>
  620. /// Indicates whether a failure occurred when the owner was draining the queue. This will
  621. /// prevent future work to be processed.
  622. /// </summary>
  623. private bool _hasFaulted;
  624. /// <summary>
  625. /// Creates a new scheduled observer that proxies to the specified observer.
  626. /// </summary>
  627. /// <param name="observer">Observer to forward notifications to.</param>
  628. public FastImmediateObserver(IObserver<T> observer)
  629. {
  630. _observer = observer;
  631. }
  632. /// <summary>
  633. /// Disposes the observer.
  634. /// </summary>
  635. public void Dispose()
  636. {
  637. Done();
  638. }
  639. /// <summary>
  640. /// Notifies the observer of pending work. This will either cause the current owner to
  641. /// process the newly enqueued notifications, or it will cause the calling thread to
  642. /// become the owner and start processing the notification queue.
  643. /// </summary>
  644. public void EnsureActive()
  645. {
  646. EnsureActive(1);
  647. }
  648. /// <summary>
  649. /// Notifies the observer of pending work. This will either cause the current owner to
  650. /// process the newly enqueued notifications, or it will cause the calling thread to
  651. /// become the owner and start processing the notification queue.
  652. /// </summary>
  653. /// <param name="count">The number of enqueued notifications to process (ignored).</param>
  654. public void EnsureActive(int count)
  655. {
  656. var isOwner = false;
  657. lock (_gate)
  658. {
  659. //
  660. // If we failed to process work in the past, we'll simply drop it.
  661. //
  662. if (!_hasFaulted)
  663. {
  664. //
  665. // If no-one is processing the notification queue, become the owner.
  666. //
  667. if (!_busy)
  668. {
  669. isOwner = true;
  670. _busy = true;
  671. }
  672. }
  673. }
  674. if (isOwner)
  675. {
  676. while (true)
  677. {
  678. var queue = default(Queue<T>);
  679. var error = default(Exception);
  680. var done = false;
  681. //
  682. // Steal notifications from the producer side to drain them to the observer.
  683. //
  684. lock (_gate)
  685. {
  686. //
  687. // Do we have any OnNext notifications to process?
  688. //
  689. if (_queue.Count > 0)
  690. {
  691. if (_queue2 == null)
  692. {
  693. _queue2 = new Queue<T>();
  694. }
  695. //
  696. // Swap out the current queue for a fresh or recycled one. The standby
  697. // queue is set to null; when notifications are sent out the processed
  698. // queue will become the new standby.
  699. //
  700. queue = _queue;
  701. _queue = _queue2;
  702. _queue2 = null;
  703. }
  704. //
  705. // Do we have any terminal notifications to process?
  706. //
  707. if (_error != null)
  708. {
  709. error = _error;
  710. }
  711. else if (_done)
  712. {
  713. done = true;
  714. }
  715. else if (queue == null)
  716. {
  717. //
  718. // No work left; quit the loop and let another thread become the
  719. // owner in the future.
  720. //
  721. _busy = false;
  722. break;
  723. }
  724. }
  725. try
  726. {
  727. //
  728. // Process OnNext notifications, if any.
  729. //
  730. if (queue != null)
  731. {
  732. //
  733. // Drain the stolen OnNext notification queue.
  734. //
  735. while (queue.Count > 0)
  736. {
  737. _observer.OnNext(queue.Dequeue());
  738. }
  739. //
  740. // The queue is now empty, so we can reuse it by making it the standby
  741. // queue for a future swap.
  742. //
  743. lock (_gate)
  744. {
  745. _queue2 = queue;
  746. }
  747. }
  748. //
  749. // Process terminal notifications, if any. Notice we don't release ownership
  750. // after processing these notifications; we simply quit from the loop. This
  751. // will cause all processing of the scheduler observer to cease.
  752. //
  753. if (error != null)
  754. {
  755. var observer = Done();
  756. observer.OnError(error);
  757. break;
  758. }
  759. else if (done)
  760. {
  761. var observer = Done();
  762. observer.OnCompleted();
  763. break;
  764. }
  765. }
  766. catch
  767. {
  768. lock (_gate)
  769. {
  770. _hasFaulted = true;
  771. _queue.Clear();
  772. }
  773. throw;
  774. }
  775. }
  776. }
  777. }
  778. /// <summary>
  779. /// Enqueues an OnCompleted notification.
  780. /// </summary>
  781. public void OnCompleted()
  782. {
  783. lock (_gate)
  784. {
  785. if (!_hasFaulted)
  786. {
  787. _done = true;
  788. }
  789. }
  790. }
  791. /// <summary>
  792. /// Enqueues an OnError notification.
  793. /// </summary>
  794. /// <param name="error">Error of the notification.</param>
  795. public void OnError(Exception error)
  796. {
  797. lock (_gate)
  798. {
  799. if (!_hasFaulted)
  800. {
  801. _error = error;
  802. }
  803. }
  804. }
  805. /// <summary>
  806. /// Enqueues an OnNext notification.
  807. /// </summary>
  808. /// <param name="value">Value of the notification.</param>
  809. public void OnNext(T value)
  810. {
  811. lock (_gate)
  812. {
  813. if (!_hasFaulted)
  814. {
  815. _queue.Enqueue(value);
  816. }
  817. }
  818. }
  819. /// <summary>
  820. /// Terminates the observer upon receiving terminal notifications, thus preventing
  821. /// future notifications to go out.
  822. /// </summary>
  823. /// <returns>Observer to send terminal notifications to.</returns>
  824. private IObserver<T> Done()
  825. {
  826. #pragma warning disable 0420
  827. return Interlocked.Exchange(ref _observer, NopObserver<T>.Instance);
  828. #pragma warning restore 0420
  829. }
  830. }
  831. }