Browse Source

Improving rendezvous support for schedulers.

Bart De Smet 8 years ago
parent
commit
f9e4915dde

+ 1 - 0
AsyncRx.NET/System.Reactive.Async.Concurrency/System.Reactive.Async.Concurrency.csproj

@@ -6,6 +6,7 @@
 
   <ItemGroup>
     <ProjectReference Include="..\System.Reactive.Async.Interfaces\System.Reactive.Async.Interfaces.csproj" />
+    <ProjectReference Include="..\System.Reactive.Bcl\System.Reactive.Bcl.csproj" />
     <ProjectReference Include="..\System.Reactive.Shared\System.Reactive.Shared.csproj" />
   </ItemGroup>
 

+ 82 - 0
AsyncRx.NET/System.Reactive.Async.Concurrency/System/Reactive/Concurrency/AsyncScheduler.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Runtime.CompilerServices;
+using System.Runtime.ExceptionServices;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -12,13 +13,41 @@ namespace System.Reactive.Concurrency
     {
         // TODO: Implement proper RendezVous semantics.
 
+        public static IAwaitable RendezVous(this IAsyncScheduler scheduler)
+        {
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return new RendezVousAwaitable(scheduler, CancellationToken.None);
+        }
+
+        public static IAwaitable RendezVous(this IAsyncScheduler scheduler, CancellationToken token)
+        {
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            token.ThrowIfCancellationRequested();
+
+            return new RendezVousAwaitable(scheduler, token);
+        }
+
         public static ConfiguredTaskAwaitable RendezVous(this Task task, IAsyncScheduler scheduler)
         {
+            if (task == null)
+                throw new ArgumentNullException(nameof(task));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
             return task.ConfigureAwait(true);
         }
 
         public static ConfiguredTaskAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler scheduler)
         {
+            if (task == null)
+                throw new ArgumentNullException(nameof(task));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
             return task.ConfigureAwait(true);
         }
 
@@ -161,5 +190,58 @@ namespace System.Reactive.Concurrency
                 return await tcs.Task.ConfigureAwait(false);
             }
         }
+
+        private sealed class RendezVousAwaitable : IAwaitable, IAwaiter // PERF: Can we avoid these allocations?
+        {
+            private readonly IAsyncScheduler _scheduler;
+            private readonly CancellationToken _token;
+
+            private bool _done;
+            private ExceptionDispatchInfo _error;
+
+            public RendezVousAwaitable(IAsyncScheduler scheduler, CancellationToken token)
+            {
+                _scheduler = scheduler;
+                _token = token;
+            }
+
+            public bool IsCompleted => _done;
+
+            public IAwaiter GetAwaiter() => this;
+
+            public void GetResult()
+            {
+                if (!_done)
+                {
+                    throw new InvalidOperationException(); // REVIEW: No support for blocking.
+                }
+
+                if (_error != null)
+                {
+                    _error.Throw();
+                }
+            }
+
+            public async void OnCompleted(Action continuation)
+            {
+                await _scheduler.ExecuteAsync(ct =>
+                {
+                    try
+                    {
+                        continuation();
+                    }
+                    catch (Exception ex)
+                    {
+                        _error = ExceptionDispatchInfo.Capture(ex);
+                    }
+                    finally
+                    {
+                        _done = true;
+                    }
+
+                    return Task.CompletedTask;
+                }, _token);
+            }
+        }
     }
 }