浏览代码

Adding some initial scheduler implementations.

Bart De Smet 8 年之前
父节点
当前提交
19a304df31

+ 81 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Concurrency/AsyncSchedulerBase.cs

@@ -0,0 +1,81 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Reactive.Concurrency
+{
+    public abstract class AsyncSchedulerBase : IAsyncScheduler
+    {
+        public virtual DateTimeOffset Now => DateTimeOffset.Now;
+
+        public virtual Task<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action)
+        {
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ScheduleAsyncCore(action);
+        }
+
+        public virtual Task<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, TimeSpan dueTime)
+        {
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            var dueTimeRelative = Normalize(dueTime);
+
+            return ScheduleAsyncCore(async ct =>
+            {
+                await Delay(dueTimeRelative); // NB: Honor SynchronizationContext to stay on scheduler.
+
+                await action(ct);
+            });
+        }
+
+        public virtual Task<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, DateTimeOffset dueTime)
+        {
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ScheduleAsyncCore(async ct =>
+            {
+                var dueTimeRelative = Normalize(dueTime - Now); // TODO: Support clock drift and clock changes.
+
+                await Delay(dueTimeRelative); // NB: Honor SynchronizationContext to stay on scheduler.
+
+                await action(ct);
+            });
+        }
+
+        protected virtual async Task<IAsyncDisposable> ScheduleAsyncCore(Func<CancellationToken, Task> action)
+        {
+            var cad = new CancellationAsyncDisposable();
+
+            await ScheduleAsyncCore(action, cad.Token).ConfigureAwait(false);
+
+            return cad;
+        }
+
+        protected abstract Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token);
+
+        protected abstract Task Delay(TimeSpan dueTime);
+
+        protected static TimeSpan Normalize(TimeSpan timeSpan) => timeSpan < TimeSpan.Zero ? TimeSpan.Zero : timeSpan;
+
+        private sealed class CancellationAsyncDisposable : IAsyncDisposable
+        {
+            private readonly CancellationTokenSource _cts = new CancellationTokenSource();
+
+            public CancellationToken Token => _cts.Token;
+
+            public Task DisposeAsync()
+            {
+                _cts.Cancel();
+
+                return Task.CompletedTask;
+            }
+        }
+    }
+}

+ 20 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Concurrency/ImmediateAsyncScheduler.cs

@@ -0,0 +1,20 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Reactive.Concurrency
+{
+    public sealed class ImmediateAsyncScheduler : AsyncSchedulerBase
+    {
+        public static ImmediateAsyncScheduler Instance { get; } = new ImmediateAsyncScheduler();
+
+        private ImmediateAsyncScheduler() { }
+
+        protected override Task Delay(TimeSpan dueTime) => Task.Delay(dueTime);
+
+        protected override Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token) => action(token);
+    }
+}

+ 50 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Concurrency/TaskPoolAsyncScheduler.cs

@@ -0,0 +1,50 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Reactive.Concurrency
+{
+    public sealed class TaskPoolAsyncScheduler : AsyncSchedulerBase
+    {
+        private readonly TaskFactory _factory;
+
+        public static TaskPoolAsyncScheduler Current { get; } = new TaskPoolAsyncScheduler(TaskScheduler.Current);
+        public static TaskPoolAsyncScheduler Default { get; } = new TaskPoolAsyncScheduler(TaskScheduler.Default);
+
+        public TaskPoolAsyncScheduler(TaskScheduler scheduler)
+        {
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            _factory = new TaskFactory(scheduler);
+        }
+
+        public TaskPoolAsyncScheduler(TaskFactory factory)
+        {
+            if (factory == null)
+                throw new ArgumentNullException(nameof(factory));
+
+            _factory = factory;
+        }
+
+        protected override Task Delay(TimeSpan dueTime) => Task.Delay(dueTime);
+
+        protected override Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token)
+        {
+            var task = _factory.StartNew(() => action(token), token);
+
+            task.Unwrap().ContinueWith(t =>
+            {
+                if (t.Exception != null)
+                {
+                    // TODO: Call event handler.
+                }
+            });
+
+            return Task.CompletedTask;
+        }
+    }
+}