Răsfoiți Sursa

Fix AsyncRx GroupByUntil double OnCompletedAsync bug

Ian Griffiths 7 luni în urmă
părinte
comite
aa5a5349cf

+ 34 - 15
AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs

@@ -723,18 +723,7 @@ namespace System.Reactive.Linq
                                     {
                                         if (key == null)
                                         {
-                                            var oldNullGroup = default(IAsyncSubject<TElement>);
-
-                                            lock (nullGate)
-                                            {
-                                                oldNullGroup = nullGroup;
-                                                nullGroup = null;
-                                            }
-
-                                            if (oldNullGroup != null)
-                                            {
-                                                await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
-                                            }
+                                            await CompleteAndRemoveNullGroupIfPresentAsync();
                                         }
                                         else
                                         {
@@ -778,12 +767,25 @@ namespace System.Reactive.Linq
                             {
                                 if (nullGroup != null)
                                 {
-                                    await nullGroup.OnCompletedAsync().ConfigureAwait(false);
+                                    await CompleteAndRemoveNullGroupIfPresentAsync();
                                 }
 
-                                foreach (var group in groups.Values)
+                                foreach (var key in groups.Keys)
                                 {
-                                    await group.OnCompletedAsync().ConfigureAwait(false);
+                                    // The ConcurrentDictionary's Keys property is a snapshot, so
+                                    // although this TryRemove should always succeed for the first
+                                    // key in the dictionary (as long as our upstream observable is
+                                    // obeying the rules, and not making multiple concurrent calls
+                                    // to our observer) each await in this loop offers an opportunity
+                                    // for one of the group duration observables to complete, which
+                                    // will cause the Expire method above to run, meaning that an
+                                    // entry that was present when we retrieved Keys at the start of
+                                    // this loop might already have been completed and removed by the
+                                    // time this loop reaches it.
+                                    if (groups.TryRemove(key, out var group))
+                                    {
+                                        await group.OnCompletedAsync().ConfigureAwait(false);
+                                    }
                                 }
 
                                 using (await gate.LockAsync().ConfigureAwait(false))
@@ -794,6 +796,23 @@ namespace System.Reactive.Linq
                         ),
                         refCount
                     );
+
+                async ValueTask CompleteAndRemoveNullGroupIfPresentAsync()
+                {
+                    var oldNullGroup = default(IAsyncSubject<TElement>);
+
+                    lock (nullGate)
+                    {
+                        oldNullGroup = nullGroup;
+                        nullGroup = null;
+                    }
+
+                    if (oldNullGroup != null)
+                    {
+                        await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
+                    }
+
+                }
             }
         }
     }