|
|
@@ -616,22 +616,8 @@ namespace System.Reactive.Linq
|
|
|
|
|
|
async ValueTask OnErrorAsync(Exception ex)
|
|
|
{
|
|
|
- var nullGroupLocal = default(IAsyncSubject<TElement>);
|
|
|
-
|
|
|
- lock (nullGate)
|
|
|
- {
|
|
|
- nullGroupLocal = nullGroup;
|
|
|
- }
|
|
|
-
|
|
|
- if (nullGroupLocal != null)
|
|
|
- {
|
|
|
- await nullGroupLocal.OnErrorAsync(ex).ConfigureAwait(false);
|
|
|
- }
|
|
|
-
|
|
|
- foreach (var group in groups.Values)
|
|
|
- {
|
|
|
- await group.OnErrorAsync(ex).ConfigureAwait(false);
|
|
|
- }
|
|
|
+ await ErrorAndRemoveNullGroupIfPresentAsync(ex);
|
|
|
+ await ErrorAndRemoveAllGroupsIfPresentAsync(ex);
|
|
|
|
|
|
using (await gate.LockAsync().ConfigureAwait(false))
|
|
|
{
|
|
|
@@ -770,23 +756,7 @@ namespace System.Reactive.Linq
|
|
|
await CompleteAndRemoveNullGroupIfPresentAsync();
|
|
|
}
|
|
|
|
|
|
- foreach (var key in groups.Keys)
|
|
|
- {
|
|
|
- // The ConcurrentDictionary's Keys property is a snapshot, so
|
|
|
- // although this TryRemove should always succeed for the first
|
|
|
- // key in the dictionary (as long as our upstream observable is
|
|
|
- // obeying the rules, and not making multiple concurrent calls
|
|
|
- // to our observer) each await in this loop offers an opportunity
|
|
|
- // for one of the group duration observables to complete, which
|
|
|
- // will cause the Expire method above to run, meaning that an
|
|
|
- // entry that was present when we retrieved Keys at the start of
|
|
|
- // this loop might already have been completed and removed by the
|
|
|
- // time this loop reaches it.
|
|
|
- if (groups.TryRemove(key, out var group))
|
|
|
- {
|
|
|
- await group.OnCompletedAsync().ConfigureAwait(false);
|
|
|
- }
|
|
|
- }
|
|
|
+ await CompleteAndRemoveAllGroupsIfPresentAsync();
|
|
|
|
|
|
using (await gate.LockAsync().ConfigureAwait(false))
|
|
|
{
|
|
|
@@ -797,7 +767,9 @@ namespace System.Reactive.Linq
|
|
|
refCount
|
|
|
);
|
|
|
|
|
|
- async ValueTask CompleteAndRemoveNullGroupIfPresentAsync()
|
|
|
+ ValueTask CompleteAndRemoveNullGroupIfPresentAsync() => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(null);
|
|
|
+ ValueTask ErrorAndRemoveNullGroupIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(x);
|
|
|
+ async ValueTask CompleteOrErrorAndRemoveNullGroupIfPresentAsync(Exception x)
|
|
|
{
|
|
|
var oldNullGroup = default(IAsyncSubject<TElement>);
|
|
|
|
|
|
@@ -809,9 +781,45 @@ namespace System.Reactive.Linq
|
|
|
|
|
|
if (oldNullGroup != null)
|
|
|
{
|
|
|
- await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
|
|
|
+ if (x is null)
|
|
|
+ {
|
|
|
+ await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ await oldNullGroup.OnErrorAsync(x).ConfigureAwait(false);
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
+ ValueTask CompleteAndRemoveAllGroupsIfPresentAsync() => CompleteOrErrorAndRemoveAllGroupsAsync(null);
|
|
|
+ ValueTask ErrorAndRemoveAllGroupsIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveAllGroupsAsync(x);
|
|
|
+ async ValueTask CompleteOrErrorAndRemoveAllGroupsAsync(Exception x)
|
|
|
+ {
|
|
|
+ foreach (var key in groups.Keys)
|
|
|
+ {
|
|
|
+ // The ConcurrentDictionary's Keys property is a snapshot, so
|
|
|
+ // although this TryRemove should always succeed for the first
|
|
|
+ // key in the dictionary (as long as our upstream observable is
|
|
|
+ // obeying the rules, and not making multiple concurrent calls
|
|
|
+ // to our observer) each await in this loop offers an opportunity
|
|
|
+ // for one of the group duration observables to complete, which
|
|
|
+ // will cause the Expire method above to run, meaning that an
|
|
|
+ // entry that was present when we retrieved Keys at the start of
|
|
|
+ // this loop might already have been completed and removed by the
|
|
|
+ // time this loop reaches it.
|
|
|
+ if (groups.TryRemove(key, out var group))
|
|
|
+ {
|
|
|
+ if (x is null)
|
|
|
+ {
|
|
|
+ await group.OnCompletedAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ await group.OnErrorAsync(x).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|