|
|
@@ -110,7 +110,45 @@ namespace System.Reactive.Linq
|
|
|
if (clock == null)
|
|
|
throw new ArgumentNullException(nameof(clock));
|
|
|
|
|
|
- throw new NotImplementedException();
|
|
|
+ var queue = new Queue<Timestamped<TSource>>();
|
|
|
+
|
|
|
+ void Trim(DateTimeOffset now)
|
|
|
+ {
|
|
|
+ while (queue.Count > 0 && now - queue.Peek().Timestamp >= duration)
|
|
|
+ {
|
|
|
+ queue.Dequeue();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return Create<TSource>(
|
|
|
+ x =>
|
|
|
+ {
|
|
|
+ var now = clock.Now;
|
|
|
+
|
|
|
+ queue.Enqueue(new Timestamped<TSource>(x, now));
|
|
|
+
|
|
|
+ Trim(now);
|
|
|
+
|
|
|
+ return Task.CompletedTask;
|
|
|
+ },
|
|
|
+ observer.OnErrorAsync,
|
|
|
+ async () =>
|
|
|
+ {
|
|
|
+ Trim(clock.Now);
|
|
|
+
|
|
|
+ var n = queue.Count;
|
|
|
+
|
|
|
+ var res = new List<TSource>(n);
|
|
|
+
|
|
|
+ while (queue.Count > 0)
|
|
|
+ {
|
|
|
+ res.Add(queue.Dequeue().Value);
|
|
|
+ }
|
|
|
+
|
|
|
+ await observer.OnNextAsync(res).ConfigureAwait(false);
|
|
|
+ await observer.OnCompletedAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
}
|