Browse Source

Merge pull request #1312 from dotnet/dev/bartde/scheduler_value_task

Use ValueTask on schedulers.
Bart J.F. De Smet 5 years ago
parent
commit
612f64605e

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Concurrency/System/Reactive/Concurrency/AsyncScheduler.cs

@@ -90,7 +90,7 @@ namespace System.Reactive.Concurrency
                     tcs.SetResult(true);
                 }
 
-                return Task.CompletedTask;
+                return default;
             }, dueTime);
 
             using (token.Register(() => task.DisposeAsync()))
@@ -117,7 +117,7 @@ namespace System.Reactive.Concurrency
                     tcs.SetResult(true);
                 }
 
-                return Task.CompletedTask;
+                return default;
             }, dueTime);
 
             using (token.Register(() => task.DisposeAsync()))

+ 6 - 6
AsyncRx.NET/System.Reactive.Async.Concurrency/System/Reactive/Concurrency/AsyncSchedulerBase.cs

@@ -11,7 +11,7 @@ namespace System.Reactive.Concurrency
     {
         public virtual DateTimeOffset Now => DateTimeOffset.Now;
 
-        public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action)
+        public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action)
         {
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
@@ -19,7 +19,7 @@ namespace System.Reactive.Concurrency
             return ScheduleAsyncCore(action);
         }
 
-        public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, TimeSpan dueTime)
+        public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, TimeSpan dueTime)
         {
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
@@ -34,7 +34,7 @@ namespace System.Reactive.Concurrency
             });
         }
 
-        public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, DateTimeOffset dueTime)
+        public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, DateTimeOffset dueTime)
         {
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
@@ -49,7 +49,7 @@ namespace System.Reactive.Concurrency
             });
         }
 
-        protected virtual async ValueTask<IAsyncDisposable> ScheduleAsyncCore(Func<CancellationToken, Task> action)
+        protected virtual async ValueTask<IAsyncDisposable> ScheduleAsyncCore(Func<CancellationToken, ValueTask> action)
         {
             var cad = new CancellationAsyncDisposable();
 
@@ -58,9 +58,9 @@ namespace System.Reactive.Concurrency
             return cad;
         }
 
-        protected abstract Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token);
+        protected abstract ValueTask ScheduleAsyncCore(Func<CancellationToken, ValueTask> action, CancellationToken token);
 
-        protected abstract Task Delay(TimeSpan dueTime, CancellationToken token);
+        protected abstract ValueTask Delay(TimeSpan dueTime, CancellationToken token);
 
         protected static TimeSpan Normalize(TimeSpan timeSpan) => timeSpan < TimeSpan.Zero ? TimeSpan.Zero : timeSpan;
 

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Concurrency/System/Reactive/Concurrency/ImmediateAsyncScheduler.cs

@@ -13,8 +13,8 @@ namespace System.Reactive.Concurrency
 
         private ImmediateAsyncScheduler() { }
 
-        protected override Task Delay(TimeSpan dueTime, CancellationToken token) => Task.Delay(dueTime);
+        protected override ValueTask Delay(TimeSpan dueTime, CancellationToken token) => new ValueTask(Task.Delay(dueTime));
 
-        protected override Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token) => action(token);
+        protected override ValueTask ScheduleAsyncCore(Func<CancellationToken, ValueTask> action, CancellationToken token) => action(token);
     }
 }

+ 3 - 3
AsyncRx.NET/System.Reactive.Async.Concurrency/System/Reactive/Concurrency/SynchronizationContextAsyncScheduler.cs

@@ -16,9 +16,9 @@ namespace System.Reactive.Concurrency
             _context = context ?? throw new ArgumentNullException(nameof(context));
         }
 
-        protected override Task Delay(TimeSpan dueTime, CancellationToken token) => Task.Delay(dueTime, token);
+        protected override ValueTask Delay(TimeSpan dueTime, CancellationToken token) => new ValueTask(Task.Delay(dueTime, token));
 
-        protected override Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token)
+        protected override ValueTask ScheduleAsyncCore(Func<CancellationToken, ValueTask> action, CancellationToken token)
         {
             _context.Post(_ =>
             {
@@ -28,7 +28,7 @@ namespace System.Reactive.Concurrency
                 }
             }, null);
 
-            return Task.CompletedTask;
+            return default;
         }
     }
 }

+ 4 - 4
AsyncRx.NET/System.Reactive.Async.Concurrency/System/Reactive/Concurrency/TaskPoolAsyncScheduler.cs

@@ -27,11 +27,11 @@ namespace System.Reactive.Concurrency
             _factory = factory ?? throw new ArgumentNullException(nameof(factory));
         }
 
-        protected override Task Delay(TimeSpan dueTime, CancellationToken token) => Task.Delay(dueTime, token);
+        protected override ValueTask Delay(TimeSpan dueTime, CancellationToken token) => new ValueTask(Task.Delay(dueTime, token));
 
-        protected override Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token)
+        protected override ValueTask ScheduleAsyncCore(Func<CancellationToken, ValueTask> action, CancellationToken token)
         {
-            var task = _factory.StartNew(() => action(token), token);
+            var task = _factory.StartNew(() => action(token).AsTask(), token);
 
             task.Unwrap().ContinueWith(t =>
             {
@@ -41,7 +41,7 @@ namespace System.Reactive.Concurrency
                 }
             });
 
-            return Task.CompletedTask;
+            return default;
         }
     }
 }

+ 3 - 3
AsyncRx.NET/System.Reactive.Async.Interfaces/System/Reactive/Concurrency/IAsyncScheduler.cs

@@ -9,8 +9,8 @@ namespace System.Reactive.Concurrency
 {
     public interface IAsyncScheduler : IClock
     {
-        ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action);
-        ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, TimeSpan dueTime);
-        ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, DateTimeOffset dueTime);
+        ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action);
+        ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, TimeSpan dueTime);
+        ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, DateTimeOffset dueTime);
     }
 }

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Linq/System/Runtime/CompilerServices/AsyncObservableMethodBuilder.cs

@@ -187,7 +187,7 @@ namespace System.Runtime.CompilerServices
             {
                 ExceptionDispatchInfo.Capture(exception).Throw();
 
-                return System.Threading.Tasks.Task.CompletedTask;
+                return default;
             });
         }
 

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/FastImmediateAsyncObserver.cs

@@ -27,6 +27,6 @@ namespace System.Reactive
 
         protected override IAwaitable<R> RendezVous<R>(ValueTask<R> task) => new ValueTaskAwaitable<R>(task, false, null, CancellationToken.None);
 
-        protected override Task ScheduleAsync() => RunAsync(_disposable.Token);
+        protected override ValueTask ScheduleAsync() => RunAsync(_disposable.Token);
     }
 }

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/ISchedulerAsyncObserver.cs

@@ -8,8 +8,8 @@ namespace System.Reactive
 {
     internal interface IScheduledAsyncObserver<T> : IAsyncObserver<T>, IAsyncDisposable
     {
-        Task EnsureActive();
+        ValueTask EnsureActive();
 
-        Task EnsureActive(int count);
+        ValueTask EnsureActive(int count);
     }
 }

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/ScheduledAsyncObserver.cs

@@ -31,7 +31,7 @@ namespace System.Reactive
 
         protected override IAwaitable<R> RendezVous<R>(ValueTask<R> task) => new ValueTaskAwaitable<R>(task, false, _scheduler, CancellationToken.None);
 
-        protected override async Task ScheduleAsync()
+        protected override async ValueTask ScheduleAsync()
         {
             var d = await _scheduler.ScheduleAsync(RunAsync).ConfigureAwait(false);
             await _disposable.AssignAsync(d).ConfigureAwait(false);

+ 4 - 4
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/ScheduledAsyncObserverBase.cs

@@ -25,9 +25,9 @@ namespace System.Reactive
             _observer = observer;
         }
 
-        public Task EnsureActive() => EnsureActive(1);
+        public ValueTask EnsureActive() => EnsureActive(1);
 
-        public async Task EnsureActive(int count)
+        public async ValueTask EnsureActive(int count)
         {
             var shouldRun = false;
 
@@ -46,9 +46,9 @@ namespace System.Reactive
             }
         }
 
-        protected abstract Task ScheduleAsync();
+        protected abstract ValueTask ScheduleAsync();
 
-        protected async Task RunAsync(CancellationToken token)
+        protected async ValueTask RunAsync(CancellationToken token)
         {
             while (!token.IsCancellationRequested)
             {

+ 13 - 13
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/Subjects/ReplayAsyncSubject.cs

@@ -230,11 +230,11 @@ namespace System.Reactive.Subjects
                 }
             }
 
-            private async Task EnsureActive(IScheduledAsyncObserver<T>[] observers)
+            private async ValueTask EnsureActive(IScheduledAsyncObserver<T>[] observers)
             {
                 if (_concurrent)
                 {
-                    await Task.WhenAll(observers.Select(o => o.EnsureActive())).ConfigureAwait(false);
+                    await Task.WhenAll(observers.Select(o => o.EnsureActive().AsTask())).ConfigureAwait(false);
                 }
                 else
                 {
@@ -285,9 +285,9 @@ namespace System.Reactive.Subjects
 
             protected abstract IScheduledAsyncObserver<T> CreateScheduledObserver(IAsyncObserver<T> observer);
 
-            protected abstract Task NextAsync(T value);
+            protected abstract ValueTask NextAsync(T value);
 
-            protected abstract Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer);
+            protected abstract ValueTask<int> ReplayAsync(IScheduledAsyncObserver<T> observer);
 
             protected abstract void Trim();
 
@@ -337,15 +337,15 @@ namespace System.Reactive.Subjects
             {
             }
 
-            protected override Task NextAsync(T value)
+            protected override ValueTask NextAsync(T value)
             {
                 _hasValue = true;
                 _value = value;
 
-                return Task.CompletedTask;
+                return default;
             }
 
-            protected override async Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
+            protected override async ValueTask<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
             {
                 if (_hasValue)
                 {
@@ -368,14 +368,14 @@ namespace System.Reactive.Subjects
             {
             }
 
-            protected override Task NextAsync(T value)
+            protected override ValueTask NextAsync(T value)
             {
                 Values.Enqueue(value);
 
-                return Task.CompletedTask;
+                return default;
             }
 
-            protected override async Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
+            protected override async ValueTask<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
             {
                 var count = Values.Count;
 
@@ -432,14 +432,14 @@ namespace System.Reactive.Subjects
                 _window = window;
             }
 
-            protected override Task NextAsync(T value)
+            protected override ValueTask NextAsync(T value)
             {
                 _values.Enqueue(new Timestamped<T>(value, _scheduler.Now));
 
-                return Task.CompletedTask;
+                return default;
             }
 
-            protected override async Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
+            protected override async ValueTask<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
             {
                 var count = _values.Count;
 

+ 3 - 3
AsyncRx.NET/System.Reactive.Bcl/System/Threading/AsyncLock.cs

@@ -13,7 +13,7 @@ namespace System.Threading
         private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
         private readonly AsyncLocal<int> _recursionCount = new AsyncLocal<int>();
 
-        public Task<Releaser> LockAsync()
+        public ValueTask<Releaser> LockAsync()
         {
             var shouldAcquire = false;
 
@@ -32,10 +32,10 @@ namespace System.Threading
 
             if (shouldAcquire)
             {
-                return _semaphore.WaitAsync().ContinueWith(_ => new Releaser(this));
+                return new ValueTask<Releaser>(_semaphore.WaitAsync().ContinueWith(_ => new Releaser(this)));
             }
 
-            return Task.FromResult(new Releaser(this));
+            return new ValueTask<Releaser>(new Releaser(this));
         }
 
         private void Release()