فهرست منبع

Implementing rendez-vous for ReplayAsyncSubject.

Bart De Smet 8 سال پیش
والد
کامیت
57e4aa0446

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Concurrency/System/Reactive/Concurrency/AsyncScheduler.cs

@@ -37,7 +37,7 @@ namespace System.Reactive.Concurrency
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return new TaskAwaitable(task, scheduler, token);
+            return new TaskAwaitable(task, false, scheduler, token);
         }
 
         public static IAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler scheduler) => RendezVous(task, scheduler, CancellationToken.None);
@@ -49,7 +49,7 @@ namespace System.Reactive.Concurrency
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return new TaskAwaitable<T>(task, scheduler, token);
+            return new TaskAwaitable<T>(task, false, scheduler, token);
         }
 
         public static async Task Delay(this IAsyncScheduler scheduler, TimeSpan dueTime, CancellationToken token = default(CancellationToken))

+ 4 - 4
AsyncRx.NET/System.Reactive.Async.Concurrency/System/Threading/Tasks/TaskAwaitable.cs

@@ -13,12 +13,12 @@ namespace System.Threading.Tasks
         private readonly IAsyncScheduler _scheduler;
         private readonly CancellationToken _token;
 
-        public TaskAwaitable(Task task, IAsyncScheduler scheduler, CancellationToken token)
+        public TaskAwaitable(Task task, bool continueOnCapturedContext, IAsyncScheduler scheduler, CancellationToken token)
         {
             if (task == null)
                 throw new ArgumentNullException(nameof(task));
 
-            _task = task.ConfigureAwait(false).GetAwaiter();
+            _task = task.ConfigureAwait(continueOnCapturedContext).GetAwaiter();
             _scheduler = scheduler;
             _token = token;
         }
@@ -86,12 +86,12 @@ namespace System.Threading.Tasks
         private readonly IAsyncScheduler _scheduler;
         private readonly CancellationToken _token;
 
-        public TaskAwaitable(Task<T> task, IAsyncScheduler scheduler, CancellationToken token)
+        public TaskAwaitable(Task<T> task, bool continueOnCapturedContext, IAsyncScheduler scheduler, CancellationToken token)
         {
             if (task == null)
                 throw new ArgumentNullException(nameof(task));
 
-            _task = task.ConfigureAwait(false).GetAwaiter();
+            _task = task.ConfigureAwait(continueOnCapturedContext).GetAwaiter();
             _scheduler = scheduler;
             _token = token;
         }

+ 2 - 0
AsyncRx.NET/System.Reactive.Async.Subjects/System.Reactive.Async.Subjects.csproj

@@ -5,6 +5,8 @@
   </PropertyGroup>
 
   <ItemGroup>
+    <ProjectReference Include="..\System.Reactive.Async.Concurrency\System.Reactive.Async.Concurrency.csproj" />
+    <ProjectReference Include="..\System.Reactive.Async.Core\System.Reactive.Async.Core.csproj" />
     <ProjectReference Include="..\System.Reactive.Async.Disposables\System.Reactive.Async.Disposables.csproj" />
     <ProjectReference Include="..\System.Reactive.Async.Interfaces\System.Reactive.Async.Interfaces.csproj" />
     <ProjectReference Include="..\System.Reactive.Bcl\System.Reactive.Bcl.csproj" />

+ 3 - 3
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/FastImmediateAsyncObserver.cs

@@ -3,7 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Disposables;
-using System.Runtime.CompilerServices;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive
@@ -19,9 +19,9 @@ namespace System.Reactive
 
         public override Task DisposeAsync() => _disposable.DisposeAsync();
 
-        protected override ConfiguredTaskAwaitable RendezVous(Task task) => task.ConfigureAwait(false);
+        protected override IAwaitable RendezVous(Task task) => new TaskAwaitable(task, false, null, CancellationToken.None);
 
-        protected override ConfiguredTaskAwaitable<R> RendezVous<R>(Task<R> task) => task.ConfigureAwait(false);
+        protected override IAwaitable<R> RendezVous<R>(Task<R> task) => new TaskAwaitable<R>(task, false, null, CancellationToken.None);
 
         protected override Task ScheduleAsync() => RunAsync(_disposable.Token);
     }

+ 3 - 5
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/ScheduledAsyncObserver.cs

@@ -4,7 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
-using System.Runtime.CompilerServices;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive
@@ -24,11 +24,9 @@ namespace System.Reactive
 
         public override Task DisposeAsync() => _disposable.DisposeAsync();
 
-        // TODO: Implement proper RendezVous semantics.
+        protected override IAwaitable RendezVous(Task task) => new TaskAwaitable(task, false, _scheduler, CancellationToken.None);
 
-        protected override ConfiguredTaskAwaitable RendezVous(Task task) => task.ConfigureAwait(false);
-
-        protected override ConfiguredTaskAwaitable<R> RendezVous<R>(Task<R> task) => task.ConfigureAwait(false);
+        protected override IAwaitable<R> RendezVous<R>(Task<R> task) => new TaskAwaitable<R>(task, false, _scheduler, CancellationToken.None);
 
         protected override async Task ScheduleAsync()
         {

+ 6 - 7
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/ScheduledAsyncObserverBase.cs

@@ -3,13 +3,12 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
-using System.Runtime.CompilerServices;
 using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive
 {
-    internal abstract class ScheduledAsyncObserverBase<T> : IScheduledAsyncObserver<T>
+    internal abstract class ScheduledAsyncObserverBase<T> : AsyncObserverBase<T>, IScheduledAsyncObserver<T>
     {
         private readonly IAsyncObserver<T> _observer;
 
@@ -115,7 +114,7 @@ namespace System.Reactive
             }
         }
 
-        public async Task OnCompletedAsync()
+        protected override async Task OnCompletedAsyncCore()
         {
             using (await _lock.LockAsync().ConfigureAwait(false))
             {
@@ -126,7 +125,7 @@ namespace System.Reactive
             }
         }
 
-        public async Task OnErrorAsync(Exception error)
+        protected override async Task OnErrorAsyncCore(Exception error)
         {
             using (await _lock.LockAsync().ConfigureAwait(false))
             {
@@ -138,7 +137,7 @@ namespace System.Reactive
             }
         }
 
-        public async Task OnNextAsync(T value)
+        protected override async Task OnNextAsyncCore(T value)
         {
             using (await _lock.LockAsync().ConfigureAwait(false))
             {
@@ -149,9 +148,9 @@ namespace System.Reactive
             }
         }
 
-        protected abstract ConfiguredTaskAwaitable RendezVous(Task task);
+        protected abstract IAwaitable RendezVous(Task task);
 
-        protected abstract ConfiguredTaskAwaitable<R> RendezVous<R>(Task<R> task);
+        protected abstract IAwaitable<R> RendezVous<R>(Task<R> task);
 
         public abstract Task DisposeAsync();
     }