// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Concurrent; using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { public partial class AsyncObservable { public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func keySelector, Func, IAsyncObservable> durationSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); return CreateAsyncObservable>.From( source, (keySelector, durationSelector), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func keySelector, Func, IAsyncObservable> durationSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, durationSelector, comparer), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func keySelector, Func, IAsyncObservable> durationSelector, int capacity) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return CreateAsyncObservable>.From( source, (keySelector, durationSelector, capacity), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func keySelector, Func, IAsyncObservable> durationSelector, int capacity, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, durationSelector, capacity, comparer), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func keySelector, Func elementSelector, Func, IAsyncObservable> durationSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); return CreateAsyncObservable>.From( source, (keySelector, elementSelector, durationSelector), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func keySelector, Func elementSelector, Func, IAsyncObservable> durationSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, elementSelector, durationSelector, comparer), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func keySelector, Func elementSelector, Func, IAsyncObservable> durationSelector, int capacity) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return CreateAsyncObservable>.From( source, (keySelector, elementSelector, durationSelector, capacity), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func keySelector, Func elementSelector, Func, IAsyncObservable> durationSelector, int capacity, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, elementSelector, durationSelector, capacity, comparer), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func> keySelector, Func, IAsyncObservable> durationSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); return CreateAsyncObservable>.From( source, (keySelector, durationSelector), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func> keySelector, Func, IAsyncObservable> durationSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, durationSelector, comparer), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func> keySelector, Func, IAsyncObservable> durationSelector, int capacity) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return CreateAsyncObservable>.From( source, (keySelector, durationSelector, capacity), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func> keySelector, Func, IAsyncObservable> durationSelector, int capacity, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, durationSelector, capacity, comparer), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func> keySelector, Func> elementSelector, Func, IAsyncObservable> durationSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); return CreateAsyncObservable>.From( source, (keySelector, elementSelector, durationSelector), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func> keySelector, Func> elementSelector, Func, IAsyncObservable> durationSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, elementSelector, durationSelector, comparer), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func> keySelector, Func> elementSelector, Func, IAsyncObservable> durationSelector, int capacity) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return CreateAsyncObservable>.From( source, (keySelector, elementSelector, durationSelector, capacity), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity))); } public static IAsyncObservable> GroupByUntil(this IAsyncObservable source, Func> keySelector, Func> elementSelector, Func, IAsyncObservable> durationSelector, int capacity, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, elementSelector, durationSelector, capacity, comparer), static (source, state, observer) => GroupByUntilCore(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer))); } private static async ValueTask GroupByUntilCore(IAsyncObservable source, IAsyncObserver> observer, Func>, IAsyncDisposable, ValueTask<(IAsyncObserver, IAsyncDisposable)>> createObserver) { var d = new SingleAssignmentAsyncDisposable(); var (sink, subscription) = await createObserver(observer, d).ConfigureAwait(false); var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); await d.AssignAsync(inner).ConfigureAwait(false); return subscription; } } public partial class AsyncObserver { public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func, IAsyncObservable> durationSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer.Default); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func, IAsyncObservable> durationSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func, IAsyncObservable> durationSelector, int capacity) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer.Default); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func, IAsyncObservable> durationSelector, int capacity, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupByUntil(observer, subscription, x => new ValueTask(keySelector(x)), durationSelector, capacity, comparer); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func elementSelector, Func, IAsyncObservable> durationSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer.Default); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func elementSelector, Func, IAsyncObservable> durationSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func elementSelector, Func, IAsyncObservable> durationSelector, int capacity) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer.Default); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func elementSelector, Func, IAsyncObservable> durationSelector, int capacity, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupByUntil(observer, subscription, x => new ValueTask(keySelector(x)), x => new ValueTask(elementSelector(x)), durationSelector, capacity, comparer); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func, IAsyncObservable> durationSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer.Default); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func, IAsyncObservable> durationSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func, IAsyncObservable> durationSelector, int capacity) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer.Default); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func, IAsyncObservable> durationSelector, int capacity, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupByUntil(observer, subscription, keySelector, x => new ValueTask(x), durationSelector, capacity, comparer); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func> elementSelector, Func, IAsyncObservable> durationSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer.Default); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func> elementSelector, Func, IAsyncObservable> durationSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func> elementSelector, Func, IAsyncObservable> durationSelector, int capacity) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer.Default); } public static ValueTask<(IAsyncObserver, IAsyncDisposable)> GroupByUntil(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func> elementSelector, Func, IAsyncObservable> durationSelector, int capacity, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (durationSelector == null) throw new ArgumentNullException(nameof(durationSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CoreAsync(); // REVIEW: Concurrent execution of a duration callback and an event could lead to an OnNext call being queued in an AsyncLockObserver // after a duration callback makes an OnCompleted call. This seems to be the case in sync Rx as well. async ValueTask<(IAsyncObserver, IAsyncDisposable)> CoreAsync() { var d = new CompositeAsyncDisposable(); await d.AddAsync(subscription).ConfigureAwait(false); var refCount = new RefCountAsyncDisposable(d); var groups = default(ConcurrentDictionary>); if (capacity == int.MaxValue) { groups = new ConcurrentDictionary>(comparer); } else { groups = new ConcurrentDictionary>(Environment.ProcessorCount * 4, capacity, comparer); } var gate = new AsyncGate(); var nullGate = new object(); var nullGroup = default(IAsyncSubject); async ValueTask OnErrorAsync(Exception ex) { var nullGroupLocal = default(IAsyncSubject); lock (nullGate) { nullGroupLocal = nullGroup; } if (nullGroupLocal != null) { await nullGroupLocal.OnErrorAsync(ex).ConfigureAwait(false); } foreach (var group in groups.Values) { await group.OnErrorAsync(ex).ConfigureAwait(false); } using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } } return ( Create ( async x => { var key = default(TKey); try { key = await keySelector(x).ConfigureAwait(false); } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } var shouldEmit = false; var group = default(IAsyncSubject); if (key == null) { lock (nullGate) { if (nullGroup == null) { var subject = new SequentialSimpleAsyncSubject(); nullGroup = AsyncSubject.Create(new AsyncQueueLockAsyncObserver(subject), subject); shouldEmit = true; } } group = nullGroup; } else { try { if (!groups.TryGetValue(key, out group)) { var subject = new SequentialSimpleAsyncSubject(); group = AsyncSubject.Create(new AsyncQueueLockAsyncObserver(subject), subject); if (groups.TryAdd(key, group)) { shouldEmit = true; } } } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } } if (shouldEmit) { var g = new GroupedAsyncObservable(key, group, refCount); var duration = default(IAsyncObservable); try { duration = durationSelector(g); } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnNextAsync(g).ConfigureAwait(false); } var durationSubscription = new SingleAssignmentAsyncDisposable(); async ValueTask Expire() { if (key == null) { var oldNullGroup = default(IAsyncSubject); lock (nullGate) { oldNullGroup = nullGroup; nullGroup = null; } if (oldNullGroup != null) { await oldNullGroup.OnCompletedAsync().ConfigureAwait(false); } } else { if (groups.TryRemove(key, out var oldGroup)) { await oldGroup.OnCompletedAsync().ConfigureAwait(false); } } await durationSubscription.DisposeAsync().ConfigureAwait(false); await d.RemoveAsync(durationSubscription).ConfigureAwait(false); } var durationObserver = Create( y => Expire(), OnErrorAsync, Expire ); await d.AddAsync(durationSubscription).ConfigureAwait(false); var durationSubscriptionInner = await duration.SubscribeSafeAsync(durationObserver).ConfigureAwait(false); await durationSubscription.AssignAsync(durationSubscriptionInner).ConfigureAwait(false); } var element = default(TElement); try { element = await elementSelector(x).ConfigureAwait(false); } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } await group.OnNextAsync(element).ConfigureAwait(false); }, OnErrorAsync, async () => { if (nullGroup != null) { await nullGroup.OnCompletedAsync().ConfigureAwait(false); } foreach (var group in groups.Values) { await group.OnCompletedAsync().ConfigureAwait(false); } using (await gate.LockAsync().ConfigureAwait(false)) { await observer.OnCompletedAsync().ConfigureAwait(false); } } ), refCount ); } } } }