|
|
@@ -7,117 +7,123 @@ using System.Reactive.Concurrency;
|
|
|
|
|
|
namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
- internal sealed class SkipLast<TSource> : Producer<TSource>
|
|
|
+ internal static class SkipLast<TSource>
|
|
|
{
|
|
|
- private readonly IObservable<TSource> _source;
|
|
|
- private readonly int _count;
|
|
|
- private readonly TimeSpan _duration;
|
|
|
- private readonly IScheduler _scheduler;
|
|
|
-
|
|
|
- public SkipLast(IObservable<TSource> source, int count)
|
|
|
- {
|
|
|
- _source = source;
|
|
|
- _count = count;
|
|
|
- }
|
|
|
-
|
|
|
- public SkipLast(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
|
|
|
- {
|
|
|
- _source = source;
|
|
|
- _duration = duration;
|
|
|
- _scheduler = scheduler;
|
|
|
- }
|
|
|
-
|
|
|
- protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
- {
|
|
|
- if (_scheduler == null)
|
|
|
- {
|
|
|
- var sink = new _(this, observer, cancel);
|
|
|
- setSink(sink);
|
|
|
- return _source.SubscribeSafe(sink);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- var sink = new SkipLastImpl(this, observer, cancel);
|
|
|
- setSink(sink);
|
|
|
- return sink.Run();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- class _ : Sink<TSource>, IObserver<TSource>
|
|
|
+ internal sealed class Count : Producer<TSource>
|
|
|
{
|
|
|
- private readonly SkipLast<TSource> _parent;
|
|
|
- private Queue<TSource> _queue;
|
|
|
-
|
|
|
- public _(SkipLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
- : base(observer, cancel)
|
|
|
- {
|
|
|
- _parent = parent;
|
|
|
- _queue = new Queue<TSource>();
|
|
|
- }
|
|
|
+ private readonly IObservable<TSource> _source;
|
|
|
+ private readonly int _count;
|
|
|
|
|
|
- public void OnNext(TSource value)
|
|
|
+ public Count(IObservable<TSource> source, int count)
|
|
|
{
|
|
|
- _queue.Enqueue(value);
|
|
|
- if (_queue.Count > _parent._count)
|
|
|
- base._observer.OnNext(_queue.Dequeue());
|
|
|
+ _source = source;
|
|
|
+ _count = count;
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
+ var sink = new _(_count, observer, cancel);
|
|
|
+ setSink(sink);
|
|
|
+ return _source.SubscribeSafe(sink);
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
+ private int _count;
|
|
|
+ private Queue<TSource> _queue;
|
|
|
+
|
|
|
+ public _(int count, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ : base(observer, cancel)
|
|
|
+ {
|
|
|
+ _count = count;
|
|
|
+ _queue = new Queue<TSource>();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void OnNext(TSource value)
|
|
|
+ {
|
|
|
+ _queue.Enqueue(value);
|
|
|
+ if (_queue.Count > _count)
|
|
|
+ base._observer.OnNext(_queue.Dequeue());
|
|
|
+ }
|
|
|
+
|
|
|
+ public void OnError(Exception error)
|
|
|
+ {
|
|
|
+ base._observer.OnError(error);
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void OnCompleted()
|
|
|
+ {
|
|
|
+ base._observer.OnCompleted();
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- class SkipLastImpl : Sink<TSource>, IObserver<TSource>
|
|
|
+ internal sealed class Time : Producer<TSource>
|
|
|
{
|
|
|
- private readonly SkipLast<TSource> _parent;
|
|
|
- private Queue<System.Reactive.TimeInterval<TSource>> _queue;
|
|
|
-
|
|
|
- public SkipLastImpl(SkipLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
- : base(observer, cancel)
|
|
|
- {
|
|
|
- _parent = parent;
|
|
|
- _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
|
|
|
- }
|
|
|
-
|
|
|
- private IStopwatch _watch;
|
|
|
-
|
|
|
- public IDisposable Run()
|
|
|
- {
|
|
|
- _watch = _parent._scheduler.StartStopwatch();
|
|
|
+ private readonly IObservable<TSource> _source;
|
|
|
+ private readonly TimeSpan _duration;
|
|
|
+ private readonly IScheduler _scheduler;
|
|
|
|
|
|
- return _parent._source.SubscribeSafe(this);
|
|
|
- }
|
|
|
-
|
|
|
- public void OnNext(TSource value)
|
|
|
+ public Time(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
|
|
|
{
|
|
|
- var now = _watch.Elapsed;
|
|
|
- _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, now));
|
|
|
- while (_queue.Count > 0 && now - _queue.Peek().Interval >= _parent._duration)
|
|
|
- base._observer.OnNext(_queue.Dequeue().Value);
|
|
|
+ _source = source;
|
|
|
+ _duration = duration;
|
|
|
+ _scheduler = scheduler;
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
+ var sink = new _(_duration, observer, cancel);
|
|
|
+ setSink(sink);
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- var now = _watch.Elapsed;
|
|
|
- while (_queue.Count > 0 && now - _queue.Peek().Interval >= _parent._duration)
|
|
|
- base._observer.OnNext(_queue.Dequeue().Value);
|
|
|
-
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
+ private readonly TimeSpan _duration;
|
|
|
+ private Queue<System.Reactive.TimeInterval<TSource>> _queue;
|
|
|
+
|
|
|
+ public _(TimeSpan duration, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ : base(observer, cancel)
|
|
|
+ {
|
|
|
+ _duration = duration;
|
|
|
+ _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
|
|
|
+ }
|
|
|
+
|
|
|
+ private IStopwatch _watch;
|
|
|
+
|
|
|
+ public IDisposable Run(Time parent)
|
|
|
+ {
|
|
|
+ _watch = parent._scheduler.StartStopwatch();
|
|
|
+
|
|
|
+ return parent._source.SubscribeSafe(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void OnNext(TSource value)
|
|
|
+ {
|
|
|
+ var now = _watch.Elapsed;
|
|
|
+ _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, now));
|
|
|
+ while (_queue.Count > 0 && now - _queue.Peek().Interval >= _duration)
|
|
|
+ base._observer.OnNext(_queue.Dequeue().Value);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void OnError(Exception error)
|
|
|
+ {
|
|
|
+ base._observer.OnError(error);
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void OnCompleted()
|
|
|
+ {
|
|
|
+ var now = _watch.Elapsed;
|
|
|
+ while (_queue.Count > 0 && now - _queue.Peek().Interval >= _duration)
|
|
|
+ base._observer.OnNext(_queue.Dequeue().Value);
|
|
|
+
|
|
|
+ base._observer.OnCompleted();
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|