1
0

ReplaySubject.cs 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive.Subjects
  9. {
  10. /// <summary>
  11. /// Represents an object that is both an observable sequence as well as an observer.
  12. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
  13. /// </summary>
  14. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  15. public sealed class ReplaySubject<T> : SubjectBase<T>, IDisposable
  16. {
  17. #region Fields
  18. /// <summary>
  19. /// Underlying optimized implementation of the replay subject.
  20. /// </summary>
  21. private readonly SubjectBase<T> _implementation;
  22. #endregion
  23. #region Constructors
  24. #region All
  25. /// <summary>
  26. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
  27. /// </summary>
  28. public ReplaySubject()
  29. : this(int.MaxValue)
  30. {
  31. }
  32. /// <summary>
  33. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified scheduler.
  34. /// </summary>
  35. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  36. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  37. public ReplaySubject(IScheduler scheduler)
  38. {
  39. _implementation = new ReplayByTime(scheduler);
  40. }
  41. #endregion
  42. #region Count
  43. /// <summary>
  44. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size.
  45. /// </summary>
  46. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  47. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  48. public ReplaySubject(int bufferSize)
  49. {
  50. switch (bufferSize)
  51. {
  52. case 1:
  53. _implementation = new ReplayOne();
  54. break;
  55. case int.MaxValue:
  56. _implementation = new ReplayAll();
  57. break;
  58. default:
  59. _implementation = new ReplayMany(bufferSize);
  60. break;
  61. }
  62. }
  63. /// <summary>
  64. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and scheduler.
  65. /// </summary>
  66. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  67. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  68. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  69. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  70. public ReplaySubject(int bufferSize, IScheduler scheduler)
  71. {
  72. _implementation = new ReplayByTime(bufferSize, scheduler);
  73. }
  74. #endregion
  75. #region Time
  76. /// <summary>
  77. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window.
  78. /// </summary>
  79. /// <param name="window">Maximum time length of the replay buffer.</param>
  80. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  81. public ReplaySubject(TimeSpan window)
  82. {
  83. _implementation = new ReplayByTime(window);
  84. }
  85. /// <summary>
  86. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window and scheduler.
  87. /// </summary>
  88. /// <param name="window">Maximum time length of the replay buffer.</param>
  89. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  90. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  91. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  92. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  93. {
  94. _implementation = new ReplayByTime(window, scheduler);
  95. }
  96. #endregion
  97. #region Count & Time
  98. /// <summary>
  99. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and window.
  100. /// </summary>
  101. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  102. /// <param name="window">Maximum time length of the replay buffer.</param>
  103. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  104. public ReplaySubject(int bufferSize, TimeSpan window)
  105. {
  106. _implementation = new ReplayByTime(bufferSize, window);
  107. }
  108. /// <summary>
  109. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size, window and scheduler.
  110. /// </summary>
  111. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  112. /// <param name="window">Maximum time length of the replay buffer.</param>
  113. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  114. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  115. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  116. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  117. {
  118. _implementation = new ReplayByTime(bufferSize, window, scheduler);
  119. }
  120. #endregion
  121. #endregion
  122. #region Properties
  123. /// <summary>
  124. /// Indicates whether the subject has observers subscribed to it.
  125. /// </summary>
  126. public override bool HasObservers
  127. {
  128. get { return _implementation.HasObservers; }
  129. }
  130. /// <summary>
  131. /// Indicates whether the subject has been disposed.
  132. /// </summary>
  133. public override bool IsDisposed
  134. {
  135. get { return _implementation.IsDisposed; }
  136. }
  137. #endregion
  138. #region Methods
  139. #region IObserver<T> implementation
  140. /// <summary>
  141. /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
  142. /// </summary>
  143. /// <param name="value">The value to send to all observers.</param>
  144. public override void OnNext(T value)
  145. {
  146. _implementation.OnNext(value);
  147. }
  148. /// <summary>
  149. /// Notifies all subscribed and future observers about the specified exception.
  150. /// </summary>
  151. /// <param name="error">The exception to send to all observers.</param>
  152. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  153. public override void OnError(Exception error)
  154. {
  155. if (error == null)
  156. throw new ArgumentNullException(nameof(error));
  157. _implementation.OnError(error);
  158. }
  159. /// <summary>
  160. /// Notifies all subscribed and future observers about the end of the sequence.
  161. /// </summary>
  162. public override void OnCompleted()
  163. {
  164. _implementation.OnCompleted();
  165. }
  166. #endregion
  167. #region IObservable<T> implementation
  168. /// <summary>
  169. /// Subscribes an observer to the subject.
  170. /// </summary>
  171. /// <param name="observer">Observer to subscribe to the subject.</param>
  172. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  173. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  174. public override IDisposable Subscribe(IObserver<T> observer)
  175. {
  176. if (observer == null)
  177. throw new ArgumentNullException(nameof(observer));
  178. return _implementation.Subscribe(observer);
  179. }
  180. #endregion
  181. #region IDisposable implementation
  182. /// <summary>
  183. /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
  184. /// </summary>
  185. public override void Dispose()
  186. {
  187. _implementation.Dispose();
  188. }
  189. #endregion
  190. #endregion
  191. private abstract class ReplayBase : SubjectBase<T>
  192. {
  193. private readonly object _gate = new object();
  194. private ImmutableList<IScheduledObserver<T>> _observers;
  195. private bool _isStopped;
  196. private Exception _error;
  197. private bool _isDisposed;
  198. public ReplayBase()
  199. {
  200. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  201. _isStopped = false;
  202. _error = null;
  203. }
  204. public override bool HasObservers
  205. {
  206. get
  207. {
  208. var observers = _observers;
  209. return observers != null && observers.Data.Length > 0;
  210. }
  211. }
  212. public override bool IsDisposed
  213. {
  214. get
  215. {
  216. lock (_gate)
  217. {
  218. return _isDisposed;
  219. }
  220. }
  221. }
  222. public override void OnNext(T value)
  223. {
  224. var o = default(IScheduledObserver<T>[]);
  225. lock (_gate)
  226. {
  227. CheckDisposed();
  228. if (!_isStopped)
  229. {
  230. Next(value);
  231. Trim();
  232. o = _observers.Data;
  233. foreach (var observer in o)
  234. observer.OnNext(value);
  235. }
  236. }
  237. if (o != null)
  238. {
  239. foreach (var observer in o)
  240. observer.EnsureActive();
  241. }
  242. }
  243. public override void OnError(Exception error)
  244. {
  245. var o = default(IScheduledObserver<T>[]);
  246. lock (_gate)
  247. {
  248. CheckDisposed();
  249. if (!_isStopped)
  250. {
  251. _isStopped = true;
  252. _error = error;
  253. Trim();
  254. o = _observers.Data;
  255. foreach (var observer in o)
  256. observer.OnError(error);
  257. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  258. }
  259. }
  260. if (o != null)
  261. {
  262. foreach (var observer in o)
  263. observer.EnsureActive();
  264. }
  265. }
  266. public override void OnCompleted()
  267. {
  268. var o = default(IScheduledObserver<T>[]);
  269. lock (_gate)
  270. {
  271. CheckDisposed();
  272. if (!_isStopped)
  273. {
  274. _isStopped = true;
  275. Trim();
  276. o = _observers.Data;
  277. foreach (var observer in o)
  278. observer.OnCompleted();
  279. _observers = ImmutableList<IScheduledObserver<T>>.Empty;
  280. }
  281. }
  282. if (o != null)
  283. {
  284. foreach (var observer in o)
  285. observer.EnsureActive();
  286. }
  287. }
  288. public override IDisposable Subscribe(IObserver<T> observer)
  289. {
  290. var so = CreateScheduledObserver(observer);
  291. var n = 0;
  292. var subscription = Disposable.Empty;
  293. lock (_gate)
  294. {
  295. CheckDisposed();
  296. //
  297. // Notice the v1.x behavior of always calling Trim is preserved here.
  298. //
  299. // This may be subject (pun intended) of debate: should this policy
  300. // only be applied while the sequence is active? With the current
  301. // behavior, a sequence will "die out" after it has terminated by
  302. // continuing to drop OnNext notifications from the queue.
  303. //
  304. // In v1.x, this behavior was due to trimming based on the clock value
  305. // returned by scheduler.Now, applied to all but the terminal message
  306. // in the queue. Using the IStopwatch has the same effect. Either way,
  307. // we guarantee the final notification will be observed, but there's
  308. // no way to retain the buffer directly. One approach is to use the
  309. // time-based TakeLast operator and apply an unbounded ReplaySubject
  310. // to it.
  311. //
  312. // To conclude, we're keeping the behavior as-is for compatibility
  313. // reasons with v1.x.
  314. //
  315. Trim();
  316. n = Replay(so);
  317. if (_error != null)
  318. {
  319. n++;
  320. so.OnError(_error);
  321. }
  322. else if (_isStopped)
  323. {
  324. n++;
  325. so.OnCompleted();
  326. }
  327. if (!_isStopped)
  328. {
  329. subscription = new Subscription(this, so);
  330. _observers = _observers.Add(so);
  331. }
  332. }
  333. so.EnsureActive(n);
  334. return subscription;
  335. }
  336. public override void Dispose()
  337. {
  338. lock (_gate)
  339. {
  340. _isDisposed = true;
  341. _observers = null;
  342. DisposeCore();
  343. }
  344. }
  345. protected abstract void DisposeCore();
  346. protected abstract void Next(T value);
  347. protected abstract int Replay(IObserver<T> observer);
  348. protected abstract void Trim();
  349. protected abstract IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer);
  350. private void CheckDisposed()
  351. {
  352. if (_isDisposed)
  353. throw new ObjectDisposedException(string.Empty);
  354. }
  355. private void Unsubscribe(IScheduledObserver<T> observer)
  356. {
  357. lock (_gate)
  358. {
  359. if (!_isDisposed)
  360. {
  361. _observers = _observers.Remove(observer);
  362. }
  363. }
  364. }
  365. #if NOTYET // TODO: Expose internal notifications similar to BehaviorSubject<T>.TryGetValue?
  366. public bool TryGetNotifications(out IList<Notification<T>> notifications)
  367. {
  368. lock (_gate)
  369. {
  370. if (_isDisposed)
  371. {
  372. notifications = null;
  373. return false;
  374. }
  375. else
  376. {
  377. var res = new List<Notification<T>>();
  378. var materializer = Observer.Create<T>(
  379. x => res.Add(Notification.CreateOnNext(x)),
  380. ex => res.Add(Notification.CreateOnError<T>(ex)),
  381. () => res.Add(Notification.CreateOnCompleted<T>())
  382. );
  383. Replay(materializer);
  384. if (_error != null)
  385. {
  386. materializer.OnError(_error);
  387. }
  388. else if (_isStopped)
  389. {
  390. materializer.OnCompleted();
  391. }
  392. notifications = res;
  393. return true;
  394. }
  395. }
  396. }
  397. #endif
  398. private sealed class Subscription : IDisposable
  399. {
  400. private readonly ReplayBase _subject;
  401. private readonly IScheduledObserver<T> _observer;
  402. public Subscription(ReplayBase subject, IScheduledObserver<T> observer)
  403. {
  404. _subject = subject;
  405. _observer = observer;
  406. }
  407. public void Dispose()
  408. {
  409. _observer.Dispose();
  410. _subject.Unsubscribe(_observer);
  411. }
  412. }
  413. }
  414. /// <summary>
  415. /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
  416. /// </summary>
  417. private sealed class ReplayByTime : ReplayBase
  418. {
  419. private const int InfiniteBufferSize = int.MaxValue;
  420. private readonly int _bufferSize;
  421. private readonly TimeSpan _window;
  422. private readonly IScheduler _scheduler;
  423. private readonly IStopwatch _stopwatch;
  424. private readonly Queue<TimeInterval<T>> _queue;
  425. public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
  426. {
  427. if (bufferSize < 0)
  428. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  429. if (window < TimeSpan.Zero)
  430. throw new ArgumentOutOfRangeException(nameof(window));
  431. if (scheduler == null)
  432. throw new ArgumentNullException(nameof(scheduler));
  433. _bufferSize = bufferSize;
  434. _window = window;
  435. _scheduler = scheduler;
  436. _stopwatch = _scheduler.StartStopwatch();
  437. _queue = new Queue<TimeInterval<T>>();
  438. }
  439. public ReplayByTime(int bufferSize, TimeSpan window)
  440. : this(bufferSize, window, SchedulerDefaults.Iteration)
  441. {
  442. }
  443. public ReplayByTime(IScheduler scheduler)
  444. : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
  445. {
  446. }
  447. public ReplayByTime(int bufferSize, IScheduler scheduler)
  448. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  449. {
  450. }
  451. public ReplayByTime(TimeSpan window, IScheduler scheduler)
  452. : this(InfiniteBufferSize, window, scheduler)
  453. {
  454. }
  455. public ReplayByTime(TimeSpan window)
  456. : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
  457. {
  458. }
  459. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  460. {
  461. return new ScheduledObserver<T>(_scheduler, observer);
  462. }
  463. protected override void DisposeCore()
  464. {
  465. _queue.Clear();
  466. }
  467. protected override void Next(T value)
  468. {
  469. var now = _stopwatch.Elapsed;
  470. _queue.Enqueue(new TimeInterval<T>(value, now));
  471. }
  472. protected override int Replay(IObserver<T> observer)
  473. {
  474. var n = _queue.Count;
  475. foreach (var item in _queue)
  476. observer.OnNext(item.Value);
  477. return n;
  478. }
  479. protected override void Trim()
  480. {
  481. var now = _stopwatch.Elapsed;
  482. while (_queue.Count > _bufferSize)
  483. _queue.Dequeue();
  484. while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
  485. _queue.Dequeue();
  486. }
  487. }
  488. //
  489. // Below are the non-time based implementations.
  490. // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
  491. // The ReplayOne implementation also removes the need to even have a queue.
  492. //
  493. private sealed class ReplayOne : ReplayBufferBase
  494. {
  495. private bool _hasValue;
  496. private T _value;
  497. protected override void Trim()
  498. {
  499. //
  500. // No need to trim.
  501. //
  502. }
  503. protected override void Next(T value)
  504. {
  505. _hasValue = true;
  506. _value = value;
  507. }
  508. protected override int Replay(IObserver<T> observer)
  509. {
  510. var n = 0;
  511. if (_hasValue)
  512. {
  513. n = 1;
  514. observer.OnNext(_value);
  515. }
  516. return n;
  517. }
  518. protected override void DisposeCore()
  519. {
  520. _value = default(T);
  521. }
  522. }
  523. private sealed class ReplayMany : ReplayManyBase
  524. {
  525. private readonly int _bufferSize;
  526. public ReplayMany(int bufferSize)
  527. : base(bufferSize)
  528. {
  529. _bufferSize = bufferSize;
  530. }
  531. protected override void Trim()
  532. {
  533. while (_queue.Count > _bufferSize)
  534. _queue.Dequeue();
  535. }
  536. }
  537. private sealed class ReplayAll : ReplayManyBase
  538. {
  539. public ReplayAll()
  540. : base(0)
  541. {
  542. }
  543. protected override void Trim()
  544. {
  545. //
  546. // Don't trim, keep all values.
  547. //
  548. }
  549. }
  550. private abstract class ReplayBufferBase : ReplayBase
  551. {
  552. protected override IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
  553. {
  554. return new FastImmediateObserver<T>(observer);
  555. }
  556. protected override void DisposeCore()
  557. {
  558. }
  559. }
  560. private abstract class ReplayManyBase : ReplayBufferBase
  561. {
  562. protected readonly Queue<T> _queue;
  563. protected ReplayManyBase(int queueSize)
  564. : base()
  565. {
  566. _queue = new Queue<T>(Math.Min(queueSize, 64));
  567. }
  568. protected override void Next(T value)
  569. {
  570. _queue.Enqueue(value);
  571. }
  572. protected override int Replay(IObserver<T> observer)
  573. {
  574. var n = _queue.Count;
  575. foreach (var item in _queue)
  576. observer.OnNext(item);
  577. return n;
  578. }
  579. protected override void DisposeCore()
  580. {
  581. _queue.Clear();
  582. }
  583. }
  584. }
  585. /// <summary>
  586. /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
  587. /// </summary>
  588. /// <typeparam name="T">Type of the elements processed by the observer.</typeparam>
  589. class FastImmediateObserver<T> : IScheduledObserver<T>
  590. {
  591. /// <summary>
  592. /// Gate to control ownership transfer and protect data structures.
  593. /// </summary>
  594. private readonly object _gate = new object();
  595. /// <summary>
  596. /// Observer to forward notifications to.
  597. /// </summary>
  598. private volatile IObserver<T> _observer;
  599. /// <summary>
  600. /// Queue to enqueue OnNext notifications into.
  601. /// </summary>
  602. private Queue<T> _queue = new Queue<T>();
  603. /// <summary>
  604. /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
  605. /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
  606. /// </summary>
  607. private Queue<T> _queue2;
  608. /// <summary>
  609. /// Exception passed to an OnError notification, if any.
  610. /// </summary>
  611. private Exception _error;
  612. /// <summary>
  613. /// Indicates whether an OnCompleted notification was received.
  614. /// </summary>
  615. private bool _done;
  616. /// <summary>
  617. /// Indicates whether the observer is busy, i.e. some thread is actively draining the
  618. /// notifications that were queued up.
  619. /// </summary>
  620. private bool _busy;
  621. /// <summary>
  622. /// Indicates whether a failure occurred when the owner was draining the queue. This will
  623. /// prevent future work to be processed.
  624. /// </summary>
  625. private bool _hasFaulted;
  626. /// <summary>
  627. /// Creates a new scheduled observer that proxies to the specified observer.
  628. /// </summary>
  629. /// <param name="observer">Observer to forward notifications to.</param>
  630. public FastImmediateObserver(IObserver<T> observer)
  631. {
  632. _observer = observer;
  633. }
  634. /// <summary>
  635. /// Disposes the observer.
  636. /// </summary>
  637. public void Dispose()
  638. {
  639. Done();
  640. }
  641. /// <summary>
  642. /// Notifies the observer of pending work. This will either cause the current owner to
  643. /// process the newly enqueued notifications, or it will cause the calling thread to
  644. /// become the owner and start processing the notification queue.
  645. /// </summary>
  646. public void EnsureActive()
  647. {
  648. EnsureActive(1);
  649. }
  650. /// <summary>
  651. /// Notifies the observer of pending work. This will either cause the current owner to
  652. /// process the newly enqueued notifications, or it will cause the calling thread to
  653. /// become the owner and start processing the notification queue.
  654. /// </summary>
  655. /// <param name="count">The number of enqueued notifications to process (ignored).</param>
  656. public void EnsureActive(int count)
  657. {
  658. var isOwner = false;
  659. lock (_gate)
  660. {
  661. //
  662. // If we failed to process work in the past, we'll simply drop it.
  663. //
  664. if (!_hasFaulted)
  665. {
  666. //
  667. // If no-one is processing the notification queue, become the owner.
  668. //
  669. if (!_busy)
  670. {
  671. isOwner = true;
  672. _busy = true;
  673. }
  674. }
  675. }
  676. if (isOwner)
  677. {
  678. while (true)
  679. {
  680. var queue = default(Queue<T>);
  681. var error = default(Exception);
  682. var done = false;
  683. //
  684. // Steal notifications from the producer side to drain them to the observer.
  685. //
  686. lock (_gate)
  687. {
  688. //
  689. // Do we have any OnNext notifications to process?
  690. //
  691. if (_queue.Count > 0)
  692. {
  693. if (_queue2 == null)
  694. {
  695. _queue2 = new Queue<T>();
  696. }
  697. //
  698. // Swap out the current queue for a fresh or recycled one. The standby
  699. // queue is set to null; when notifications are sent out the processed
  700. // queue will become the new standby.
  701. //
  702. queue = _queue;
  703. _queue = _queue2;
  704. _queue2 = null;
  705. }
  706. //
  707. // Do we have any terminal notifications to process?
  708. //
  709. if (_error != null)
  710. {
  711. error = _error;
  712. }
  713. else if (_done)
  714. {
  715. done = true;
  716. }
  717. else if (queue == null)
  718. {
  719. //
  720. // No work left; quit the loop and let another thread become the
  721. // owner in the future.
  722. //
  723. _busy = false;
  724. break;
  725. }
  726. }
  727. try
  728. {
  729. //
  730. // Process OnNext notifications, if any.
  731. //
  732. if (queue != null)
  733. {
  734. //
  735. // Drain the stolen OnNext notification queue.
  736. //
  737. while (queue.Count > 0)
  738. {
  739. _observer.OnNext(queue.Dequeue());
  740. }
  741. //
  742. // The queue is now empty, so we can reuse it by making it the standby
  743. // queue for a future swap.
  744. //
  745. lock (_gate)
  746. {
  747. _queue2 = queue;
  748. }
  749. }
  750. //
  751. // Process terminal notifications, if any. Notice we don't release ownership
  752. // after processing these notifications; we simply quit from the loop. This
  753. // will cause all processing of the scheduler observer to cease.
  754. //
  755. if (error != null)
  756. {
  757. var observer = Done();
  758. observer.OnError(error);
  759. break;
  760. }
  761. else if (done)
  762. {
  763. var observer = Done();
  764. observer.OnCompleted();
  765. break;
  766. }
  767. }
  768. catch
  769. {
  770. lock (_gate)
  771. {
  772. _hasFaulted = true;
  773. _queue.Clear();
  774. }
  775. throw;
  776. }
  777. }
  778. }
  779. }
  780. /// <summary>
  781. /// Enqueues an OnCompleted notification.
  782. /// </summary>
  783. public void OnCompleted()
  784. {
  785. lock (_gate)
  786. {
  787. if (!_hasFaulted)
  788. {
  789. _done = true;
  790. }
  791. }
  792. }
  793. /// <summary>
  794. /// Enqueues an OnError notification.
  795. /// </summary>
  796. /// <param name="error">Error of the notification.</param>
  797. public void OnError(Exception error)
  798. {
  799. lock (_gate)
  800. {
  801. if (!_hasFaulted)
  802. {
  803. _error = error;
  804. }
  805. }
  806. }
  807. /// <summary>
  808. /// Enqueues an OnNext notification.
  809. /// </summary>
  810. /// <param name="value">Value of the notification.</param>
  811. public void OnNext(T value)
  812. {
  813. lock (_gate)
  814. {
  815. if (!_hasFaulted)
  816. {
  817. _queue.Enqueue(value);
  818. }
  819. }
  820. }
  821. /// <summary>
  822. /// Terminates the observer upon receiving terminal notifications, thus preventing
  823. /// future notifications to go out.
  824. /// </summary>
  825. /// <returns>Observer to send terminal notifications to.</returns>
  826. private IObserver<T> Done()
  827. {
  828. #pragma warning disable 0420
  829. return Interlocked.Exchange(ref _observer, NopObserver<T>.Instance);
  830. #pragma warning restore 0420
  831. }
  832. }
  833. }