浏览代码

Add ExecuteAsync extension methods to IAsyncScheduler.

Bart De Smet 8 年之前
父节点
当前提交
1a50cadf72
共有 1 个文件被更改,包括 86 次插入0 次删除
  1. 86 0
      AsyncRx.NET/System.Reactive.Async/System/Reactive/Concurrency/AsyncScheduler.cs

+ 86 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Concurrency/AsyncScheduler.cs

@@ -48,5 +48,91 @@ namespace System.Reactive.Concurrency
                 await tcs.Task;
             }
         }
+
+        public static async Task ExecuteAsync(this IAsyncScheduler scheduler, Func<CancellationToken, Task> action, CancellationToken token = default(CancellationToken))
+        {
+            var tcs = new TaskCompletionSource<object>();
+
+            var d = await scheduler.ScheduleAsync(async ct =>
+            {
+                try
+                {
+                    ct.ThrowIfCancellationRequested();
+
+                    await action(ct).RendezVous(scheduler);
+                }
+                catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
+                {
+                    tcs.TrySetCanceled();
+                }
+                catch (Exception ex)
+                {
+                    tcs.TrySetException(ex);
+                }
+                finally
+                {
+                    tcs.TrySetResult(null);
+                }
+            });
+
+            using (token.Register(() =>
+            {
+                try
+                {
+                    d.DisposeAsync();
+                }
+                finally
+                {
+                    tcs.TrySetCanceled();
+                }
+            }))
+            {
+                await tcs.Task.ConfigureAwait(false);
+            }
+        }
+
+        public static async Task<TResult> ExecuteAsync<TResult>(this IAsyncScheduler scheduler, Func<CancellationToken, Task<TResult>> action, CancellationToken token = default(CancellationToken))
+        {
+            var tcs = new TaskCompletionSource<TResult>();
+
+            var d = await scheduler.ScheduleAsync(async ct =>
+            {
+                var res = default(TResult);
+
+                try
+                {
+                    ct.ThrowIfCancellationRequested();
+
+                    res = await action(ct).RendezVous(scheduler);
+                }
+                catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
+                {
+                    tcs.TrySetCanceled();
+                }
+                catch (Exception ex)
+                {
+                    tcs.TrySetException(ex);
+                }
+                finally
+                {
+                    tcs.TrySetResult(res);
+                }
+            });
+
+            using (token.Register(() =>
+            {
+                try
+                {
+                    d.DisposeAsync();
+                }
+                finally
+                {
+                    tcs.TrySetCanceled();
+                }
+            }))
+            {
+                return await tcs.Task.ConfigureAwait(false);
+            }
+        }
     }
 }