123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Threading;
- using System.Threading.Tasks;
- namespace System.Reactive.Concurrency
- {
- public abstract class AsyncSchedulerBase : IAsyncScheduler
- {
- public virtual DateTimeOffset Now => DateTimeOffset.Now;
- public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action)
- {
- if (action == null)
- throw new ArgumentNullException(nameof(action));
- return ScheduleAsyncCore(action);
- }
- public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, TimeSpan dueTime)
- {
- if (action == null)
- throw new ArgumentNullException(nameof(action));
- var dueTimeRelative = Normalize(dueTime);
- return ScheduleAsyncCore(async ct =>
- {
- await Delay(dueTimeRelative, ct); // NB: Honor SynchronizationContext to stay on scheduler.
- await action(ct);
- });
- }
- public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, DateTimeOffset dueTime)
- {
- if (action == null)
- throw new ArgumentNullException(nameof(action));
- return ScheduleAsyncCore(async ct =>
- {
- var dueTimeRelative = Normalize(dueTime - Now); // TODO: Support clock drift and clock changes.
- await Delay(dueTimeRelative, ct); // NB: Honor SynchronizationContext to stay on scheduler.
- await action(ct);
- });
- }
- protected virtual async ValueTask<IAsyncDisposable> ScheduleAsyncCore(Func<CancellationToken, ValueTask> action)
- {
- var cad = new CancellationAsyncDisposable();
- await ScheduleAsyncCore(action, cad.Token).ConfigureAwait(false);
- return cad;
- }
- protected abstract ValueTask ScheduleAsyncCore(Func<CancellationToken, ValueTask> action, CancellationToken token);
- protected abstract ValueTask Delay(TimeSpan dueTime, CancellationToken token);
- protected static TimeSpan Normalize(TimeSpan timeSpan) => timeSpan < TimeSpan.Zero ? TimeSpan.Zero : timeSpan;
- private sealed class CancellationAsyncDisposable : IAsyncDisposable
- {
- private readonly CancellationTokenSource _cts = new();
- public CancellationToken Token => _cts.Token;
- public ValueTask DisposeAsync()
- {
- _cts.Cancel();
- return default;
- }
- }
- }
- }
|