// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if HAS_AWAIT using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Concurrency { public static partial class Scheduler { /// /// Yields execution of the current work item on the scheduler to another work item on the scheduler. /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation). /// /// Scheduler to yield work on. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Yield(this IScheduler scheduler) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return new SchedulerOperation(a => scheduler.Schedule(a), scheduler.GetCancellationToken()); } /// /// Yields execution of the current work item on the scheduler to another work item on the scheduler. /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation). /// /// Scheduler to yield work on. /// Cancellation token to cancel the continuation to run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Yield(this IScheduler scheduler, CancellationToken cancellationToken) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return new SchedulerOperation(a => scheduler.Schedule(a), cancellationToken); } /// /// Suspends execution of the current work item on the scheduler for the specified duration. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration. /// /// Scheduler to yield work on. /// Time when the continuation should run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken()); } /// /// Suspends execution of the current work item on the scheduler for the specified duration. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration. /// /// Scheduler to yield work on. /// Time when the continuation should run. /// Cancellation token to cancel the continuation to run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime, CancellationToken cancellationToken) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken); } /// /// Suspends execution of the current work item on the scheduler until the specified due time. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time. /// /// Scheduler to yield work on. /// Time when the continuation should run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken()); } /// /// Suspends execution of the current work item on the scheduler until the specified due time. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time. /// /// Scheduler to yield work on. /// Time when the continuation should run. /// Cancellation token to cancel the continuation to run. /// Scheduler operation object to await in order to schedule the continuation. /// is null. public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime, CancellationToken cancellationToken) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, Func action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, state, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, Func> action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, state, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, Func action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, Func> action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, TimeSpan dueTime, Func action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, state, dueTime, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, TimeSpan dueTime, Func> action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, state, dueTime, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Relative time after which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func> action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, state, dueTime, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// The type of the state passed to the scheduled action. /// Scheduler to schedule work on. /// State to pass to the asynchronous method. /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func> action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, state, dueTime, action); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct)); } /// /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. /// /// Scheduler to schedule work on. /// Absolute time at which to execute the action. /// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points. /// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method. /// or is null. public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func> action) { if (scheduler == null) throw new ArgumentNullException("scheduler"); if (action == null) throw new ArgumentNullException("action"); return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, Func action) { return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, Func> action) { return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, TimeSpan dueTime, Func action) { return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, TimeSpan dueTime, Func> action) { return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func action) { return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action)); } private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func> action) { return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action)); } private static IDisposable InvokeAsync(IScheduler self, TState s, Func> action) { var c = new CancellationDisposable(); var d = new SingleAssignmentDisposable(); action(new CancelableScheduler(self, c.Token), s, c.Token).ContinueWith(t => { if (t.IsCanceled) return; if (t.Exception != null) t.Exception.Handle(e => e is OperationCanceledException); d.Disposable = t.Result; }, TaskContinuationOptions.ExecuteSynchronously); return StableCompositeDisposable.Create(c, d); } private static IDisposable InvokeAsync(IScheduler self, TState s, Func action) { return InvokeAsync(self, s, (self_, state, ct) => action(self_, state, ct).ContinueWith(_ => Disposable.Empty)); } private static CancellationToken GetCancellationToken(this IScheduler scheduler) { var cs = scheduler as CancelableScheduler; return cs != null ? cs.Token : CancellationToken.None; } class CancelableScheduler : IScheduler { private readonly IScheduler _scheduler; private readonly CancellationToken _cancellationToken; public CancelableScheduler(IScheduler scheduler, CancellationToken cancellationToken) { _scheduler = scheduler; _cancellationToken = cancellationToken; } public CancellationToken Token { get { return _cancellationToken; } } public DateTimeOffset Now { get { return _scheduler.Now; } } public IDisposable Schedule(TState state, Func action) { return _scheduler.Schedule(state, action); } public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { return _scheduler.Schedule(state, dueTime, action); } public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) { return _scheduler.Schedule(state, dueTime, action); } } } } #endif