|
|
@@ -96,7 +96,33 @@ namespace System.Reactive.Linq
|
|
|
if (clock == null)
|
|
|
throw new ArgumentNullException(nameof(clock));
|
|
|
|
|
|
- throw new NotImplementedException();
|
|
|
+ var queue = new Queue<Timestamped<TSource>>();
|
|
|
+
|
|
|
+ async Task FlushAsync(DateTimeOffset now)
|
|
|
+ {
|
|
|
+ while (queue.Count > 0 && now - queue.Peek().Timestamp >= duration)
|
|
|
+ {
|
|
|
+ await observer.OnNextAsync(queue.Dequeue().Value).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return Create<TSource>(
|
|
|
+ async x =>
|
|
|
+ {
|
|
|
+ var now = clock.Now;
|
|
|
+
|
|
|
+ queue.Enqueue(new Timestamped<TSource>(x, now));
|
|
|
+
|
|
|
+ await FlushAsync(now).ConfigureAwait(false);
|
|
|
+ },
|
|
|
+ observer.OnErrorAsync,
|
|
|
+ async () =>
|
|
|
+ {
|
|
|
+ await FlushAsync(clock.Now).ConfigureAwait(false);
|
|
|
+
|
|
|
+ await observer.OnCompletedAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
}
|