// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Concurrency { public static partial class Scheduler { private sealed class AsyncInvocation : IDisposable { private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private IDisposable _run; public IDisposable Run(IScheduler self, TState s, Func> action) { if (_cts.IsCancellationRequested) return Disposable.Empty; action(new CancelableScheduler(self, _cts.Token), s, _cts.Token).ContinueWith( (t, @thisObject) => { if (!t.IsCanceled) { var @this = (AsyncInvocation)@thisObject; t.Exception?.Handle(e => e is OperationCanceledException); Disposable.SetSingle(ref @this._run, t.Result); } }, this, TaskContinuationOptions.ExecuteSynchronously); return this; } public void Dispose() { _cts.Cancel(); Disposable.TryDispose(ref _run); } } /// /// Yields execution of the current work item on the scheduler to another work item on the scheduler. /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation). /// /// Scheduler to yield work on. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Yield(this IScheduler scheduler) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } return new SchedulerOperation(a => scheduler.Schedule(a), scheduler.GetCancellationToken()); } /// /// Yields execution of the current work item on the scheduler to another work item on the scheduler. /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation). /// /// Scheduler to yield work on. /// Cancellation token to cancel the continuation to run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Yield(this IScheduler scheduler, CancellationToken cancellationToken) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } return new SchedulerOperation(a => scheduler.Schedule(a), cancellationToken); } /// /// Suspends execution of the current work item on the scheduler for the specified duration. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration. /// /// Scheduler to yield work on. /// Time when the continuation should run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken()); } /// /// Suspends execution of the current work item on the scheduler for the specified duration. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration. /// /// Scheduler to yield work on. /// Time when the continuation should run. /// Cancellation token to cancel the continuation to run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime, CancellationToken cancellationToken) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken); } /// /// Suspends execution of the current work item on the scheduler until the specified due time. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time. /// /// Scheduler to yield work on. /// Time when the continuation should run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken()); } /// /// Suspends execution of the current work item on the scheduler until the specified due time. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time. /// /// Scheduler to yield work on. /// Time when the continuation should run. /// Cancellation token to cancel the continuation to run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime, CancellationToken cancellationToken) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, Func action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, state, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, Func> action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, state, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, Func action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, action, (self, closureAction, ct) => closureAction(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, Func> action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, action, (self, closureAction, ct) => closureAction(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, TimeSpan dueTime, Func action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, state, dueTime, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, TimeSpan dueTime, Func> action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, state, dueTime, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func> action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, state, dueTime, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func> action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, state, dueTime, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func> action) { if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, Func action) { return scheduler.Schedule((state, action), (self, t) => InvokeAsync(self, t.state, t.action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, Func> action) { return scheduler.Schedule((state, action), (self, t) => InvokeAsync(self, t.state, t.action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, TimeSpan dueTime, Func action) { return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, TimeSpan dueTime, Func> action) { return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func action) { return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func> action) { return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action)); } private static IDisposable InvokeAsync(IScheduler self, TState s, Func> action) { return new AsyncInvocation().Run(self, s, action); } private static IDisposable InvokeAsync(IScheduler self, TState s, Func action) { return InvokeAsync(self, (action, state: s), (self_, t, ct) => t.action(self_, t.state, ct).ContinueWith(_ => Disposable.Empty)); } private static CancellationToken GetCancellationToken(this IScheduler scheduler) { return scheduler is CancelableScheduler cs ? cs.Token : CancellationToken.None; } private sealed class CancelableScheduler : IScheduler { private readonly IScheduler _scheduler; private readonly CancellationToken _cancellationToken; public CancelableScheduler(IScheduler scheduler, CancellationToken cancellationToken) { _scheduler = scheduler; _cancellationToken = cancellationToken; } public CancellationToken Token { get { return _cancellationToken; } } public DateTimeOffset Now => _scheduler.Now; public IDisposable Schedule(TState state, Func action) { return _scheduler.Schedule(state, action); } public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { return _scheduler.Schedule(state, dueTime, action); } public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) { return _scheduler.Schedule(state, dueTime, action); } } } }