// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#if HAS_AWAIT
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Concurrency
{
public static partial class Scheduler
{
///
/// Yields execution of the current work item on the scheduler to another work item on the scheduler.
/// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation).
///
/// Scheduler to yield work on.
/// Scheduler operation object to await in order to schedule the continuation.
/// is null.
public static SchedulerOperation Yield(this IScheduler scheduler)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation(a => scheduler.Schedule(a), scheduler.GetCancellationToken());
}
///
/// Yields execution of the current work item on the scheduler to another work item on the scheduler.
/// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation).
///
/// Scheduler to yield work on.
/// Cancellation token to cancel the continuation to run.
/// Scheduler operation object to await in order to schedule the continuation.
/// is null.
public static SchedulerOperation Yield(this IScheduler scheduler, CancellationToken cancellationToken)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation(a => scheduler.Schedule(a), cancellationToken);
}
///
/// Suspends execution of the current work item on the scheduler for the specified duration.
/// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration.
///
/// Scheduler to yield work on.
/// Time when the continuation should run.
/// Scheduler operation object to await in order to schedule the continuation.
/// is null.
public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken());
}
///
/// Suspends execution of the current work item on the scheduler for the specified duration.
/// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration.
///
/// Scheduler to yield work on.
/// Time when the continuation should run.
/// Cancellation token to cancel the continuation to run.
/// Scheduler operation object to await in order to schedule the continuation.
/// is null.
public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime, CancellationToken cancellationToken)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken);
}
///
/// Suspends execution of the current work item on the scheduler until the specified due time.
/// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time.
///
/// Scheduler to yield work on.
/// Time when the continuation should run.
/// Scheduler operation object to await in order to schedule the continuation.
/// is null.
public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken());
}
///
/// Suspends execution of the current work item on the scheduler until the specified due time.
/// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time.
///
/// Scheduler to yield work on.
/// Time when the continuation should run.
/// Cancellation token to cancel the continuation to run.
/// Scheduler operation object to await in order to schedule the continuation.
/// is null.
public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime, CancellationToken cancellationToken)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken);
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// The type of the state passed to the scheduled action.
/// Scheduler to schedule work on.
/// State to pass to the asynchronous method.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, Func action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, action);
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// The type of the state passed to the scheduled action.
/// Scheduler to schedule work on.
/// State to pass to the asynchronous method.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, Func> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, action);
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// Scheduler to schedule work on.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, Func action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct));
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// Scheduler to schedule work on.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, Func> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct));
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// The type of the state passed to the scheduled action.
/// Scheduler to schedule work on.
/// State to pass to the asynchronous method.
/// Relative time after which to execute the action.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, TimeSpan dueTime, Func action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, dueTime, action);
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// The type of the state passed to the scheduled action.
/// Scheduler to schedule work on.
/// State to pass to the asynchronous method.
/// Relative time after which to execute the action.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, TimeSpan dueTime, Func> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, dueTime, action);
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// Scheduler to schedule work on.
/// Relative time after which to execute the action.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// Scheduler to schedule work on.
/// Relative time after which to execute the action.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// The type of the state passed to the scheduled action.
/// Scheduler to schedule work on.
/// State to pass to the asynchronous method.
/// Absolute time at which to execute the action.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, dueTime, action);
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// The type of the state passed to the scheduled action.
/// Scheduler to schedule work on.
/// State to pass to the asynchronous method.
/// Absolute time at which to execute the action.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, dueTime, action);
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// Scheduler to schedule work on.
/// Absolute time at which to execute the action.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
}
///
/// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
///
/// Scheduler to schedule work on.
/// Absolute time at which to execute the action.
/// Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.
/// Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.
/// or is null.
public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
}
private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, Func action)
{
return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action));
}
private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, Func> action)
{
return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action));
}
private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, TimeSpan dueTime, Func action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
}
private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, TimeSpan dueTime, Func> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
}
private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
}
private static IDisposable ScheduleAsync_(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
}
private static IDisposable InvokeAsync(IScheduler self, TState s, Func> action)
{
var c = new CancellationDisposable();
var d = new SingleAssignmentDisposable();
action(new CancelableScheduler(self, c.Token), s, c.Token).ContinueWith(t =>
{
if (t.IsCanceled)
return;
if (t.Exception != null)
t.Exception.Handle(e => e is OperationCanceledException);
d.Disposable = t.Result;
}, TaskContinuationOptions.ExecuteSynchronously);
return StableCompositeDisposable.Create(c, d);
}
private static IDisposable InvokeAsync(IScheduler self, TState s, Func action)
{
return InvokeAsync(self, s, (self_, state, ct) => action(self_, state, ct).ContinueWith(_ => Disposable.Empty));
}
private static CancellationToken GetCancellationToken(this IScheduler scheduler)
{
var cs = scheduler as CancelableScheduler;
return cs != null ? cs.Token : CancellationToken.None;
}
class CancelableScheduler : IScheduler
{
private readonly IScheduler _scheduler;
private readonly CancellationToken _cancellationToken;
public CancelableScheduler(IScheduler scheduler, CancellationToken cancellationToken)
{
_scheduler = scheduler;
_cancellationToken = cancellationToken;
}
public CancellationToken Token
{
get { return _cancellationToken; }
}
public DateTimeOffset Now
{
get { return _scheduler.Now; }
}
public IDisposable Schedule(TState state, Func action)
{
return _scheduler.Schedule(state, action);
}
public IDisposable Schedule(TState state, TimeSpan dueTime, Func action)
{
return _scheduler.Schedule(state, dueTime, action);
}
public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action)
{
return _scheduler.Schedule(state, dueTime, action);
}
}
}
}
#endif