| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- #if HAS_AWAIT
- using System.Reactive.Disposables;
- using System.Threading;
- using System.Threading.Tasks;
- namespace System.Reactive.Concurrency
- {
- public static partial class Scheduler
- {
- /// <summary>
- /// Yields execution of the current work item on the scheduler to another work item on the scheduler.
- /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation).
- /// </summary>
- /// <param name="scheduler">Scheduler to yield work on.</param>
- /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
- public static SchedulerOperation Yield(this IScheduler scheduler)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- return new SchedulerOperation(a => scheduler.Schedule(a), scheduler.GetCancellationToken());
- }
- /// <summary>
- /// Yields execution of the current work item on the scheduler to another work item on the scheduler.
- /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation).
- /// </summary>
- /// <param name="scheduler">Scheduler to yield work on.</param>
- /// <param name="cancellationToken">Cancellation token to cancel the continuation to run.</param>
- /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
- public static SchedulerOperation Yield(this IScheduler scheduler, CancellationToken cancellationToken)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- return new SchedulerOperation(a => scheduler.Schedule(a), cancellationToken);
- }
- /// <summary>
- /// Suspends execution of the current work item on the scheduler for the specified duration.
- /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration.
- /// </summary>
- /// <param name="scheduler">Scheduler to yield work on.</param>
- /// <param name="dueTime">Time when the continuation should run.</param>
- /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
- public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken());
- }
- /// <summary>
- /// Suspends execution of the current work item on the scheduler for the specified duration.
- /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration.
- /// </summary>
- /// <param name="scheduler">Scheduler to yield work on.</param>
- /// <param name="dueTime">Time when the continuation should run.</param>
- /// <param name="cancellationToken">Cancellation token to cancel the continuation to run.</param>
- /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
- public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime, CancellationToken cancellationToken)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken);
- }
- /// <summary>
- /// Suspends execution of the current work item on the scheduler until the specified due time.
- /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time.
- /// </summary>
- /// <param name="scheduler">Scheduler to yield work on.</param>
- /// <param name="dueTime">Time when the continuation should run.</param>
- /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
- public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken());
- }
- /// <summary>
- /// Suspends execution of the current work item on the scheduler until the specified due time.
- /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time.
- /// </summary>
- /// <param name="scheduler">Scheduler to yield work on.</param>
- /// <param name="dueTime">Time when the continuation should run.</param>
- /// <param name="cancellationToken">Cancellation token to cancel the continuation to run.</param>
- /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
- public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime, CancellationToken cancellationToken)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken);
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="state">State to pass to the asynchronous method.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_<TState>(scheduler, state, action);
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="state">State to pass to the asynchronous method.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_<TState>(scheduler, state, action);
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, Task> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct));
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct));
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="state">State to pass to the asynchronous method.</param>
- /// <param name="dueTime">Relative time after which to execute the action.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, state, dueTime, action);
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="state">State to pass to the asynchronous method.</param>
- /// <param name="dueTime">Relative time after which to execute the action.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, state, dueTime, action);
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="dueTime">Relative time after which to execute the action.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="dueTime">Relative time after which to execute the action.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="state">State to pass to the asynchronous method.</param>
- /// <param name="dueTime">Absolute time at which to execute the action.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, state, dueTime, action);
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="state">State to pass to the asynchronous method.</param>
- /// <param name="dueTime">Absolute time at which to execute the action.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, state, dueTime, action);
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="dueTime">Absolute time at which to execute the action.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func<IScheduler, CancellationToken, Task> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
- }
- /// <summary>
- /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
- /// </summary>
- /// <param name="scheduler">Scheduler to schedule work on.</param>
- /// <param name="dueTime">Absolute time at which to execute the action.</param>
- /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
- /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception>
- public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
- {
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
- if (action == null)
- throw new ArgumentNullException("action");
- return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
- }
- private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action)
- {
- return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action));
- }
- private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
- {
- return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action));
- }
- private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
- {
- return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
- }
- private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
- {
- return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
- }
- private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
- {
- return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
- }
- private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
- {
- return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
- }
- private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
- {
- var c = new CancellationDisposable();
- var d = new SingleAssignmentDisposable();
- action(new CancelableScheduler(self, c.Token), s, c.Token).ContinueWith(t =>
- {
- if (t.IsCanceled)
- return;
- if (t.Exception != null)
- t.Exception.Handle(e => e is OperationCanceledException);
- d.Disposable = t.Result;
- }, TaskContinuationOptions.ExecuteSynchronously);
- return StableCompositeDisposable.Create(c, d);
- }
- private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task> action)
- {
- return InvokeAsync(self, s, (self_, state, ct) => action(self_, state, ct).ContinueWith(_ => Disposable.Empty));
- }
- private static CancellationToken GetCancellationToken(this IScheduler scheduler)
- {
- var cs = scheduler as CancelableScheduler;
- return cs != null ? cs.Token : CancellationToken.None;
- }
- class CancelableScheduler : IScheduler
- {
- private readonly IScheduler _scheduler;
- private readonly CancellationToken _cancellationToken;
- public CancelableScheduler(IScheduler scheduler, CancellationToken cancellationToken)
- {
- _scheduler = scheduler;
- _cancellationToken = cancellationToken;
- }
- public CancellationToken Token
- {
- get { return _cancellationToken; }
- }
- public DateTimeOffset Now
- {
- get { return _scheduler.Now; }
- }
- public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
- {
- return _scheduler.Schedule(state, action);
- }
- public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
- {
- return _scheduler.Schedule(state, dueTime, action);
- }
- public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
- {
- return _scheduler.Schedule(state, dueTime, action);
- }
- }
- }
- }
- #endif
|