|
|
@@ -10,6 +10,41 @@ namespace System.Reactive.Concurrency
|
|
|
{
|
|
|
public static partial class Scheduler
|
|
|
{
|
|
|
+ private sealed class AsyncInvocation<TState> : IDisposable
|
|
|
+ {
|
|
|
+ private readonly CancellationTokenSource _cts = new CancellationTokenSource();
|
|
|
+ private IDisposable _run;
|
|
|
+
|
|
|
+ public IDisposable Run(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
|
|
|
+ {
|
|
|
+ if (_cts.IsCancellationRequested)
|
|
|
+ return Disposable.Empty;
|
|
|
+
|
|
|
+ action(new CancelableScheduler(self, _cts.Token), s, _cts.Token).ContinueWith(
|
|
|
+ (t, @thisObject) =>
|
|
|
+ {
|
|
|
+ if (!t.IsCanceled)
|
|
|
+ {
|
|
|
+ var @this = (AsyncInvocation<TState>)@thisObject;
|
|
|
+
|
|
|
+ t.Exception?.Handle(e => e is OperationCanceledException);
|
|
|
+
|
|
|
+ Disposable.SetSingle(ref @this._run, t.Result);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ this,
|
|
|
+ TaskContinuationOptions.ExecuteSynchronously);
|
|
|
+
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void Dispose()
|
|
|
+ {
|
|
|
+ _cts.Cancel();
|
|
|
+ Disposable.TryDispose(ref _run);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/// <summary>
|
|
|
/// Yields execution of the current work item on the scheduler to another work item on the scheduler.
|
|
|
/// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation).
|
|
|
@@ -435,25 +470,7 @@ namespace System.Reactive.Concurrency
|
|
|
|
|
|
private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
|
|
|
{
|
|
|
- var c = new CancellationDisposable();
|
|
|
- var d = new SingleAssignmentDisposable();
|
|
|
-
|
|
|
- action(new CancelableScheduler(self, c.Token), s, c.Token).ContinueWith(t =>
|
|
|
- {
|
|
|
- if (t.IsCanceled)
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if (t.Exception != null)
|
|
|
- {
|
|
|
- t.Exception.Handle(e => e is OperationCanceledException);
|
|
|
- }
|
|
|
-
|
|
|
- d.Disposable = t.Result;
|
|
|
- }, TaskContinuationOptions.ExecuteSynchronously);
|
|
|
-
|
|
|
- return StableCompositeDisposable.Create(c, d);
|
|
|
+ return new AsyncInvocation<TState>().Run(self, s, action);
|
|
|
}
|
|
|
|
|
|
private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task> action)
|