Selaa lähdekoodia

Improving setup of Amb.

Bart De Smet 8 vuotta sitten
vanhempi
sitoutus
7238840f50

+ 7 - 7
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Amb.cs

@@ -4,6 +4,7 @@
 
 using System.Reactive.Disposables;
 using System.Threading;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
@@ -18,18 +19,17 @@ namespace System.Reactive.Linq
 
             return Create<TSource>(async observer =>
             {
-                IAsyncDisposable firstSubscription = null;
-                IAsyncDisposable secondSubscription = null;
+                var firstSubscription = new SingleAssignmentAsyncDisposable();
+                var secondSubscription = new SingleAssignmentAsyncDisposable();
 
                 var (firstObserver, secondObserver) = AsyncObserver.Amb(observer, firstSubscription, secondSubscription);
 
-                var firstTask = first.SubscribeAsync(firstObserver);
-                var secondTask = second.SubscribeAsync(secondObserver);
+                var firstTask = first.SubscribeAsync(firstObserver).ContinueWith(d => firstSubscription.AssignAsync(d.Result));
+                var secondTask = second.SubscribeAsync(secondObserver).ContinueWith(d => secondSubscription.AssignAsync(d.Result));
 
-                var d1 = await firstTask.ConfigureAwait(false);
-                var d2 = await secondTask.ConfigureAwait(false);
+                await Task.WhenAll(firstTask, secondTask).ConfigureAwait(false);
 
-                return StableCompositeAsyncDisposable.Create(d1, d2);
+                return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
             });
         }
     }