|
@@ -57,12 +57,31 @@ namespace System.Reactive.Linq
|
|
|
return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeLast(observer, duration)));
|
|
|
}
|
|
|
|
|
|
- public static IAsyncObservable<TSource> TakeLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IAsyncScheduler scheduler)
|
|
|
+ public static IAsyncObservable<TSource> TakeLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IClock clock)
|
|
|
{
|
|
|
if (source == null)
|
|
|
throw new ArgumentNullException(nameof(source));
|
|
|
if (duration < TimeSpan.Zero)
|
|
|
throw new ArgumentOutOfRangeException(nameof(duration));
|
|
|
+ if (clock == null)
|
|
|
+ throw new ArgumentNullException(nameof(clock));
|
|
|
+
|
|
|
+ if (duration == TimeSpan.Zero)
|
|
|
+ {
|
|
|
+ return Empty<TSource>();
|
|
|
+ }
|
|
|
+
|
|
|
+ return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeLast(observer, duration, clock)));
|
|
|
+ }
|
|
|
+
|
|
|
+ public static IAsyncObservable<TSource> TakeLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IClock clock, IAsyncScheduler scheduler)
|
|
|
+ {
|
|
|
+ if (source == null)
|
|
|
+ throw new ArgumentNullException(nameof(source));
|
|
|
+ if (duration < TimeSpan.Zero)
|
|
|
+ throw new ArgumentOutOfRangeException(nameof(duration));
|
|
|
+ if (clock == null)
|
|
|
+ throw new ArgumentNullException(nameof(clock));
|
|
|
if (scheduler == null)
|
|
|
throw new ArgumentNullException(nameof(scheduler));
|
|
|
|
|
@@ -71,8 +90,10 @@ namespace System.Reactive.Linq
|
|
|
return Empty<TSource>();
|
|
|
}
|
|
|
|
|
|
- return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeLast(observer, duration, scheduler)));
|
|
|
+ return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeLast(observer, duration, clock, scheduler)));
|
|
|
}
|
|
|
+
|
|
|
+ public static IAsyncObservable<TSource> TakeLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IAsyncScheduler scheduler) => TakeLast(source, duration, scheduler, scheduler);
|
|
|
}
|
|
|
|
|
|
partial class AsyncObserver
|
|
@@ -125,9 +146,13 @@ namespace System.Reactive.Linq
|
|
|
throw new NotImplementedException();
|
|
|
}
|
|
|
|
|
|
- public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration) => TakeLast(observer, duration, TaskPoolAsyncScheduler.Default);
|
|
|
+ public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration) => TakeLast(observer, duration, TaskPoolAsyncScheduler.Default, TaskPoolAsyncScheduler.Default);
|
|
|
+
|
|
|
+ public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IAsyncScheduler scheduler) => TakeLast(observer, duration, scheduler, scheduler);
|
|
|
+
|
|
|
+ public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IClock clock) => TakeLast(observer, duration, clock, TaskPoolAsyncScheduler.Default);
|
|
|
|
|
|
- public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IAsyncScheduler scheduler)
|
|
|
+ public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IClock clock, IAsyncScheduler scheduler)
|
|
|
{
|
|
|
if (observer == null)
|
|
|
throw new ArgumentNullException(nameof(observer));
|