|
@@ -200,7 +200,53 @@ namespace System.Reactive.Linq
|
|
|
if (scheduler == null)
|
|
|
throw new ArgumentNullException(nameof(scheduler));
|
|
|
|
|
|
- throw new NotImplementedException();
|
|
|
+ var sad = new SingleAssignmentAsyncDisposable();
|
|
|
+
|
|
|
+ var queue = new Queue<Timestamped<TSource>>();
|
|
|
+
|
|
|
+ void Trim(DateTimeOffset now)
|
|
|
+ {
|
|
|
+ while (queue.Count > 0 && now - queue.Peek().Timestamp >= duration)
|
|
|
+ {
|
|
|
+ queue.Dequeue();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return
|
|
|
+ (
|
|
|
+ Create<TSource>(
|
|
|
+ x =>
|
|
|
+ {
|
|
|
+ var now = clock.Now;
|
|
|
+
|
|
|
+ queue.Enqueue(new Timestamped<TSource>(x, now));
|
|
|
+
|
|
|
+ Trim(now);
|
|
|
+
|
|
|
+ return Task.CompletedTask;
|
|
|
+ },
|
|
|
+ observer.OnErrorAsync,
|
|
|
+ async () =>
|
|
|
+ {
|
|
|
+ Trim(clock.Now);
|
|
|
+
|
|
|
+ var drain = await scheduler.ScheduleAsync(async ct =>
|
|
|
+ {
|
|
|
+ while (!ct.IsCancellationRequested && queue.Count > 0)
|
|
|
+ {
|
|
|
+ await observer.OnNextAsync(queue.Dequeue().Value).RendezVous(scheduler);
|
|
|
+ }
|
|
|
+
|
|
|
+ ct.ThrowIfCancellationRequested();
|
|
|
+
|
|
|
+ await observer.OnCompletedAsync().RendezVous(scheduler);
|
|
|
+ }).ConfigureAwait(false);
|
|
|
+
|
|
|
+ await sad.AssignAsync(drain).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ sad
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
}
|