TakeLast.cs 7.0 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. namespace System.Reactive.Linq.Observαble
  8. {
  9. class TakeLast<TSource> : Producer<TSource>
  10. {
  11. private readonly IObservable<TSource> _source;
  12. private readonly int _count;
  13. private readonly TimeSpan _duration;
  14. private readonly IScheduler _scheduler;
  15. private readonly IScheduler _loopScheduler;
  16. public TakeLast(IObservable<TSource> source, int count, IScheduler loopScheduler)
  17. {
  18. _source = source;
  19. _count = count;
  20. _loopScheduler = loopScheduler;
  21. }
  22. public TakeLast(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler, IScheduler loopScheduler)
  23. {
  24. _source = source;
  25. _duration = duration;
  26. _scheduler = scheduler;
  27. _loopScheduler = loopScheduler;
  28. }
  29. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  30. {
  31. if (_scheduler == null)
  32. {
  33. var sink = new _(this, observer, cancel);
  34. setSink(sink);
  35. return sink.Run();
  36. }
  37. else
  38. {
  39. var sink = new τ(this, observer, cancel);
  40. setSink(sink);
  41. return sink.Run();
  42. }
  43. }
  44. class _ : Sink<TSource>, IObserver<TSource>
  45. {
  46. private readonly TakeLast<TSource> _parent;
  47. private Queue<TSource> _queue;
  48. public _(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  49. : base(observer, cancel)
  50. {
  51. _parent = parent;
  52. _queue = new Queue<TSource>();
  53. }
  54. private SingleAssignmentDisposable _subscription;
  55. private SingleAssignmentDisposable _loop;
  56. public IDisposable Run()
  57. {
  58. _subscription = new SingleAssignmentDisposable();
  59. _loop = new SingleAssignmentDisposable();
  60. _subscription.Disposable = _parent._source.SubscribeSafe(this);
  61. return new CompositeDisposable(_subscription, _loop);
  62. }
  63. public void OnNext(TSource value)
  64. {
  65. _queue.Enqueue(value);
  66. if (_queue.Count > _parent._count)
  67. _queue.Dequeue();
  68. }
  69. public void OnError(Exception error)
  70. {
  71. base._observer.OnError(error);
  72. base.Dispose();
  73. }
  74. public void OnCompleted()
  75. {
  76. _subscription.Dispose();
  77. var longRunning = _parent._loopScheduler.AsLongRunning();
  78. if (longRunning != null)
  79. _loop.Disposable = longRunning.ScheduleLongRunning(Loop);
  80. else
  81. _loop.Disposable = _parent._loopScheduler.Schedule(LoopRec);
  82. }
  83. private void LoopRec(Action recurse)
  84. {
  85. if (_queue.Count > 0)
  86. {
  87. base._observer.OnNext(_queue.Dequeue());
  88. recurse();
  89. }
  90. else
  91. {
  92. base._observer.OnCompleted();
  93. base.Dispose();
  94. }
  95. }
  96. private void Loop(ICancelable cancel)
  97. {
  98. var n = _queue.Count;
  99. while (!cancel.IsDisposed)
  100. {
  101. if (n == 0)
  102. {
  103. base._observer.OnCompleted();
  104. break;
  105. }
  106. else
  107. base._observer.OnNext(_queue.Dequeue());
  108. n--;
  109. }
  110. base.Dispose();
  111. }
  112. }
  113. class τ : Sink<TSource>, IObserver<TSource>
  114. {
  115. private readonly TakeLast<TSource> _parent;
  116. private Queue<System.Reactive.TimeInterval<TSource>> _queue;
  117. public τ(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  118. : base(observer, cancel)
  119. {
  120. _parent = parent;
  121. _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
  122. }
  123. private SingleAssignmentDisposable _subscription;
  124. private SingleAssignmentDisposable _loop;
  125. private IStopwatch _watch;
  126. public IDisposable Run()
  127. {
  128. _subscription = new SingleAssignmentDisposable();
  129. _loop = new SingleAssignmentDisposable();
  130. _watch = _parent._scheduler.StartStopwatch();
  131. _subscription.Disposable = _parent._source.SubscribeSafe(this);
  132. return new CompositeDisposable(_subscription, _loop);
  133. }
  134. public void OnNext(TSource value)
  135. {
  136. var now = _watch.Elapsed;
  137. _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, now));
  138. Trim(now);
  139. }
  140. public void OnError(Exception error)
  141. {
  142. base._observer.OnError(error);
  143. base.Dispose();
  144. }
  145. public void OnCompleted()
  146. {
  147. _subscription.Dispose();
  148. var now = _watch.Elapsed;
  149. Trim(now);
  150. var longRunning = _parent._loopScheduler.AsLongRunning();
  151. if (longRunning != null)
  152. _loop.Disposable = longRunning.ScheduleLongRunning(Loop);
  153. else
  154. _loop.Disposable = _parent._loopScheduler.Schedule(LoopRec);
  155. }
  156. private void LoopRec(Action recurse)
  157. {
  158. if (_queue.Count > 0)
  159. {
  160. base._observer.OnNext(_queue.Dequeue().Value);
  161. recurse();
  162. }
  163. else
  164. {
  165. base._observer.OnCompleted();
  166. base.Dispose();
  167. }
  168. }
  169. private void Loop(ICancelable cancel)
  170. {
  171. var n = _queue.Count;
  172. while (!cancel.IsDisposed)
  173. {
  174. if (n == 0)
  175. {
  176. base._observer.OnCompleted();
  177. break;
  178. }
  179. else
  180. base._observer.OnNext(_queue.Dequeue().Value);
  181. n--;
  182. }
  183. base.Dispose();
  184. }
  185. private void Trim(TimeSpan now)
  186. {
  187. while (_queue.Count > 0 && now - _queue.Peek().Interval >= _parent._duration)
  188. _queue.Dequeue();
  189. }
  190. }
  191. }
  192. }
  193. #endif