TakeLast.cs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Disposables;
  9. namespace System.Reactive.Linq.ObservableImpl
  10. {
  11. class TakeLast<TSource> : Producer<TSource>
  12. {
  13. private readonly IObservable<TSource> _source;
  14. private readonly int _count;
  15. private readonly TimeSpan _duration;
  16. private readonly IScheduler _scheduler;
  17. private readonly IScheduler _loopScheduler;
  18. public TakeLast(IObservable<TSource> source, int count, IScheduler loopScheduler)
  19. {
  20. _source = source;
  21. _count = count;
  22. _loopScheduler = loopScheduler;
  23. }
  24. public TakeLast(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler, IScheduler loopScheduler)
  25. {
  26. _source = source;
  27. _duration = duration;
  28. _scheduler = scheduler;
  29. _loopScheduler = loopScheduler;
  30. }
  31. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  32. {
  33. if (_scheduler == null)
  34. {
  35. var sink = new _(this, observer, cancel);
  36. setSink(sink);
  37. return sink.Run();
  38. }
  39. else
  40. {
  41. var sink = new TakeLastImpl(this, observer, cancel);
  42. setSink(sink);
  43. return sink.Run();
  44. }
  45. }
  46. class _ : Sink<TSource>, IObserver<TSource>
  47. {
  48. private readonly TakeLast<TSource> _parent;
  49. private Queue<TSource> _queue;
  50. public _(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  51. : base(observer, cancel)
  52. {
  53. _parent = parent;
  54. _queue = new Queue<TSource>();
  55. }
  56. private SingleAssignmentDisposable _subscription;
  57. private SingleAssignmentDisposable _loop;
  58. public IDisposable Run()
  59. {
  60. _subscription = new SingleAssignmentDisposable();
  61. _loop = new SingleAssignmentDisposable();
  62. _subscription.Disposable = _parent._source.SubscribeSafe(this);
  63. return StableCompositeDisposable.Create(_subscription, _loop);
  64. }
  65. public void OnNext(TSource value)
  66. {
  67. _queue.Enqueue(value);
  68. if (_queue.Count > _parent._count)
  69. _queue.Dequeue();
  70. }
  71. public void OnError(Exception error)
  72. {
  73. base._observer.OnError(error);
  74. base.Dispose();
  75. }
  76. public void OnCompleted()
  77. {
  78. _subscription.Dispose();
  79. var longRunning = _parent._loopScheduler.AsLongRunning();
  80. if (longRunning != null)
  81. _loop.Disposable = longRunning.ScheduleLongRunning(Loop);
  82. else
  83. _loop.Disposable = _parent._loopScheduler.Schedule(LoopRec);
  84. }
  85. private void LoopRec(Action recurse)
  86. {
  87. if (_queue.Count > 0)
  88. {
  89. base._observer.OnNext(_queue.Dequeue());
  90. recurse();
  91. }
  92. else
  93. {
  94. base._observer.OnCompleted();
  95. base.Dispose();
  96. }
  97. }
  98. private void Loop(ICancelable cancel)
  99. {
  100. var n = _queue.Count;
  101. while (!cancel.IsDisposed)
  102. {
  103. if (n == 0)
  104. {
  105. base._observer.OnCompleted();
  106. break;
  107. }
  108. else
  109. base._observer.OnNext(_queue.Dequeue());
  110. n--;
  111. }
  112. base.Dispose();
  113. }
  114. }
  115. class TakeLastImpl : Sink<TSource>, IObserver<TSource>
  116. {
  117. private readonly TakeLast<TSource> _parent;
  118. private Queue<System.Reactive.TimeInterval<TSource>> _queue;
  119. public TakeLastImpl(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  120. : base(observer, cancel)
  121. {
  122. _parent = parent;
  123. _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
  124. }
  125. private SingleAssignmentDisposable _subscription;
  126. private SingleAssignmentDisposable _loop;
  127. private IStopwatch _watch;
  128. public IDisposable Run()
  129. {
  130. _subscription = new SingleAssignmentDisposable();
  131. _loop = new SingleAssignmentDisposable();
  132. _watch = _parent._scheduler.StartStopwatch();
  133. _subscription.Disposable = _parent._source.SubscribeSafe(this);
  134. return StableCompositeDisposable.Create(_subscription, _loop);
  135. }
  136. public void OnNext(TSource value)
  137. {
  138. var now = _watch.Elapsed;
  139. _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, now));
  140. Trim(now);
  141. }
  142. public void OnError(Exception error)
  143. {
  144. base._observer.OnError(error);
  145. base.Dispose();
  146. }
  147. public void OnCompleted()
  148. {
  149. _subscription.Dispose();
  150. var now = _watch.Elapsed;
  151. Trim(now);
  152. var longRunning = _parent._loopScheduler.AsLongRunning();
  153. if (longRunning != null)
  154. _loop.Disposable = longRunning.ScheduleLongRunning(Loop);
  155. else
  156. _loop.Disposable = _parent._loopScheduler.Schedule(LoopRec);
  157. }
  158. private void LoopRec(Action recurse)
  159. {
  160. if (_queue.Count > 0)
  161. {
  162. base._observer.OnNext(_queue.Dequeue().Value);
  163. recurse();
  164. }
  165. else
  166. {
  167. base._observer.OnCompleted();
  168. base.Dispose();
  169. }
  170. }
  171. private void Loop(ICancelable cancel)
  172. {
  173. var n = _queue.Count;
  174. while (!cancel.IsDisposed)
  175. {
  176. if (n == 0)
  177. {
  178. base._observer.OnCompleted();
  179. break;
  180. }
  181. else
  182. base._observer.OnNext(_queue.Dequeue().Value);
  183. n--;
  184. }
  185. base.Dispose();
  186. }
  187. private void Trim(TimeSpan now)
  188. {
  189. while (_queue.Count > 0 && now - _queue.Peek().Interval >= _parent._duration)
  190. _queue.Dequeue();
  191. }
  192. }
  193. }
  194. }
  195. #endif