// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Threading.Tasks; namespace System.Reactive.Linq { partial class AsyncObservable { public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func keySelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func keySelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, comparer))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func keySelector, int capacity) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, capacity))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func keySelector, int capacity, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, capacity, comparer))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func keySelector, Func elementSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, elementSelector))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func keySelector, Func elementSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, elementSelector, comparer))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func keySelector, Func elementSelector, int capacity) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, elementSelector, capacity))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func keySelector, Func elementSelector, int capacity, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, elementSelector, capacity, comparer))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func> keySelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func> keySelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, comparer))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func> keySelector, int capacity) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, capacity))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func> keySelector, int capacity, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, capacity, comparer))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func> keySelector, Func> elementSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, elementSelector))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func> keySelector, Func> elementSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, elementSelector, comparer))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func> keySelector, Func> elementSelector, int capacity) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, elementSelector, capacity))); } public static IAsyncObservable> GroupBy(this IAsyncObservable source, Func> keySelector, Func> elementSelector, int capacity, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector, elementSelector, capacity, comparer))); } private static async Task GroupByCore(IAsyncObservable source, IAsyncObserver> observer, Func>, IAsyncDisposable, (IAsyncObserver, IAsyncDisposable)> createObserver) { var d = new SingleAssignmentAsyncDisposable(); var (sink, subscription) = createObserver(observer, d); var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false); await d.AssignAsync(inner).ConfigureAwait(false); return subscription; } } partial class AsyncObserver { public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); return GroupBy(observer, subscription, keySelector, int.MaxValue, EqualityComparer.Default); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupBy(observer, subscription, keySelector, int.MaxValue, comparer); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, int capacity) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return GroupBy(observer, subscription, keySelector, capacity, EqualityComparer.Default); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, int capacity, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupBy(observer, subscription, x => Task.FromResult(keySelector(x)), capacity, comparer); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func elementSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, EqualityComparer.Default); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func elementSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, comparer); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func elementSelector, int capacity) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return GroupBy(observer, subscription, keySelector, elementSelector, capacity, EqualityComparer.Default); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func keySelector, Func elementSelector, int capacity, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupBy(observer, subscription, x => Task.FromResult(keySelector(x)), x => Task.FromResult(elementSelector(x)), capacity, comparer); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); return GroupBy(observer, subscription, keySelector, int.MaxValue, EqualityComparer.Default); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupBy(observer, subscription, keySelector, int.MaxValue, comparer); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, int capacity) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return GroupBy(observer, subscription, keySelector, capacity, EqualityComparer.Default); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, int capacity, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupBy(observer, subscription, keySelector, x => Task.FromResult(x), capacity, comparer); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func> elementSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, EqualityComparer.Default); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func> elementSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, comparer); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func> elementSelector, int capacity) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); return GroupBy(observer, subscription, keySelector, elementSelector, capacity, EqualityComparer.Default); } public static (IAsyncObserver, IAsyncDisposable) GroupBy(IAsyncObserver> observer, IAsyncDisposable subscription, Func> keySelector, Func> elementSelector, int capacity, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (subscription == null) throw new ArgumentNullException(nameof(subscription)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); var refCount = new RefCountAsyncDisposable(subscription); var groups = default(Dictionary>); if (capacity == int.MaxValue) { groups = new Dictionary>(comparer); } else { groups = new Dictionary>(capacity, comparer); } var nullGroup = default(SequentialSimpleAsyncSubject); async Task OnErrorAsync(Exception ex) { if (nullGroup != null) { await nullGroup.OnErrorAsync(ex).ConfigureAwait(false); } foreach (var group in groups.Values) { await group.OnErrorAsync(ex).ConfigureAwait(false); } await observer.OnErrorAsync(ex).ConfigureAwait(false); } return ( Create ( async x => { var key = default(TKey); try { key = await keySelector(x).ConfigureAwait(false); } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } var shouldEmit = false; var group = default(SequentialSimpleAsyncSubject); if (key == null) { if (nullGroup == null) { nullGroup = new SequentialSimpleAsyncSubject(); shouldEmit = true; } group = nullGroup; } else { try { if (!groups.TryGetValue(key, out group)) { group = new SequentialSimpleAsyncSubject(); groups.Add(key, group); shouldEmit = true; } } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } } if (shouldEmit) { var g = new GroupedAsyncObservable(key, group, refCount); await observer.OnNextAsync(g).ConfigureAwait(false); } var element = default(TElement); try { element = await elementSelector(x).ConfigureAwait(false); } catch (Exception ex) { await OnErrorAsync(ex).ConfigureAwait(false); return; } await group.OnNextAsync(element).ConfigureAwait(false); }, OnErrorAsync, async () => { if (nullGroup != null) { await nullGroup.OnCompletedAsync().ConfigureAwait(false); } foreach (var group in groups.Values) { await group.OnCompletedAsync().ConfigureAwait(false); } await observer.OnCompletedAsync().ConfigureAwait(false); } ), refCount ); } } }