Bläddra i källkod

Implementing Amb.

Bart De Smet 8 år sedan
förälder
incheckning
36d5ed96b2

+ 69 - 2
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Amb.cs

@@ -47,6 +47,8 @@ namespace System.Reactive.Linq
 
             var gate = new AsyncLock();
 
+            var state = AmbState.None;
+
             return
                 (
                     Create<TSource>(
@@ -54,19 +56,48 @@ namespace System.Reactive.Linq
                         {
                             using (await gate.LockAsync().ConfigureAwait(false))
                             {
+                                if (state == AmbState.None)
+                                {
+                                    state = AmbState.First;
+                                    await second.DisposeAsync().ConfigureAwait(false);
+                                }
+
+                                if (state == AmbState.First)
+                                {
+                                    await observer.OnNextAsync(x).ConfigureAwait(false);
+                                }
                             }
                         },
                         async ex =>
                         {
                             using (await gate.LockAsync().ConfigureAwait(false))
                             {
-                                await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                                if (state == AmbState.None)
+                                {
+                                    state = AmbState.First;
+                                    await second.DisposeAsync().ConfigureAwait(false);
+                                }
+
+                                if (state == AmbState.First)
+                                {
+                                    await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                                }
                             }
                         },
                         async () =>
                         {
                             using (await gate.LockAsync().ConfigureAwait(false))
                             {
+                                if (state == AmbState.None)
+                                {
+                                    state = AmbState.First;
+                                    await second.DisposeAsync().ConfigureAwait(false);
+                                }
+
+                                if (state == AmbState.First)
+                                {
+                                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                                }
                             }
                         }
                     ),
@@ -75,23 +106,59 @@ namespace System.Reactive.Linq
                         {
                             using (await gate.LockAsync().ConfigureAwait(false))
                             {
+                                if (state == AmbState.None)
+                                {
+                                    state = AmbState.Second;
+                                    await first.DisposeAsync().ConfigureAwait(false);
+                                }
+
+                                if (state == AmbState.Second)
+                                {
+                                    await observer.OnNextAsync(x).ConfigureAwait(false);
+                                }
                             }
                         },
                         async ex =>
                         {
                             using (await gate.LockAsync().ConfigureAwait(false))
                             {
-                                await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                                if (state == AmbState.None)
+                                {
+                                    state = AmbState.Second;
+                                    await first.DisposeAsync().ConfigureAwait(false);
+                                }
+
+                                if (state == AmbState.Second)
+                                {
+                                    await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                                }
                             }
                         },
                         async () =>
                         {
                             using (await gate.LockAsync().ConfigureAwait(false))
                             {
+                                if (state == AmbState.None)
+                                {
+                                    state = AmbState.Second;
+                                    await first.DisposeAsync().ConfigureAwait(false);
+                                }
+
+                                if (state == AmbState.Second)
+                                {
+                                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                                }
                             }
                         }
                     )
                 );
         }
+
+        private enum AmbState
+        {
+            None,
+            First,
+            Second,
+        }
     }
 }