Browse Source

Ensure observer only gets either completion or error, and only once

Ian Griffiths 5 months ago
parent
commit
ed1f538746
1 changed files with 15 additions and 3 deletions
  1. 15 3
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs

+ 15 - 3
AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs

@@ -613,6 +613,7 @@ namespace System.Reactive.Linq
 
                 var nullGate = new object();
                 var nullGroup = default(IAsyncSubject<TElement>);
+                bool observerComplete = false;
 
                 async ValueTask OnErrorAsync(Exception ex)
                 {
@@ -621,7 +622,11 @@ namespace System.Reactive.Linq
 
                     using (await gate.LockAsync().ConfigureAwait(false))
                     {
-                        await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                        if (!observerComplete)
+                        {
+                            observerComplete = true;
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false); 
+                        }
                     }
                 }
 
@@ -700,7 +705,10 @@ namespace System.Reactive.Linq
 
                                     using (await gate.LockAsync().ConfigureAwait(false))
                                     {
-                                        await observer.OnNextAsync(g).ConfigureAwait(false);
+                                        if (!observerComplete)
+                                        {
+                                            await observer.OnNextAsync(g).ConfigureAwait(false); 
+                                        }
                                     }
 
                                     var durationSubscription = new SingleAssignmentAsyncDisposable();
@@ -760,7 +768,11 @@ namespace System.Reactive.Linq
 
                                 using (await gate.LockAsync().ConfigureAwait(false))
                                 {
-                                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                                    if (!observerComplete)
+                                    {
+                                        observerComplete = true;
+                                        await observer.OnCompletedAsync().ConfigureAwait(false); 
+                                    }
                                 }
                             }
                         ),