|
@@ -6,39 +6,50 @@ using System.Reactive.Concurrency;
|
|
|
|
|
|
|
|
namespace System.Reactive.Linq.ObservableImpl
|
|
namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
{
|
|
|
- internal sealed class DelaySubscription<TSource> : Producer<TSource>
|
|
|
|
|
|
|
+ internal abstract class DelaySubscription<TSource> : Producer<TSource>
|
|
|
{
|
|
{
|
|
|
private readonly IObservable<TSource> _source;
|
|
private readonly IObservable<TSource> _source;
|
|
|
- private readonly DateTimeOffset? _dueTimeA;
|
|
|
|
|
- private readonly TimeSpan? _dueTimeR;
|
|
|
|
|
private readonly IScheduler _scheduler;
|
|
private readonly IScheduler _scheduler;
|
|
|
|
|
|
|
|
- public DelaySubscription(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
|
|
|
|
|
|
|
+ public DelaySubscription(IObservable<TSource> source, IScheduler scheduler)
|
|
|
{
|
|
{
|
|
|
_source = source;
|
|
_source = source;
|
|
|
- _dueTimeA = dueTime;
|
|
|
|
|
_scheduler = scheduler;
|
|
_scheduler = scheduler;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- public DelaySubscription(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
|
|
|
|
|
|
|
+ internal sealed class Relative : DelaySubscription<TSource>
|
|
|
{
|
|
{
|
|
|
- _source = source;
|
|
|
|
|
- _dueTimeR = dueTime;
|
|
|
|
|
- _scheduler = scheduler;
|
|
|
|
|
|
|
+ private readonly TimeSpan _dueTime;
|
|
|
|
|
+
|
|
|
|
|
+ public Relative(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
|
|
|
|
|
+ : base(source, scheduler)
|
|
|
|
|
+ {
|
|
|
|
|
+ _dueTime = dueTime;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
|
|
+ {
|
|
|
|
|
+ var sink = new _(observer, cancel);
|
|
|
|
|
+ setSink(sink);
|
|
|
|
|
+ return _scheduler.Schedule(sink, _dueTime, Subscribe);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
|
|
|
|
+ internal sealed class Absolute : DelaySubscription<TSource>
|
|
|
{
|
|
{
|
|
|
- var sink = new _(observer, cancel);
|
|
|
|
|
- setSink(sink);
|
|
|
|
|
|
|
+ private readonly DateTimeOffset _dueTime;
|
|
|
|
|
|
|
|
- if (_dueTimeA.HasValue)
|
|
|
|
|
|
|
+ public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
|
|
|
|
|
+ : base(source, scheduler)
|
|
|
{
|
|
{
|
|
|
- return _scheduler.Schedule(sink, _dueTimeA.Value, Subscribe);
|
|
|
|
|
|
|
+ _dueTime = dueTime;
|
|
|
}
|
|
}
|
|
|
- else
|
|
|
|
|
|
|
+
|
|
|
|
|
+ protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
{
|
|
|
- return _scheduler.Schedule(sink, _dueTimeR.Value, Subscribe);
|
|
|
|
|
|
|
+ var sink = new _(observer, cancel);
|
|
|
|
|
+ setSink(sink);
|
|
|
|
|
+ return _scheduler.Schedule(sink, _dueTime, Subscribe);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|