// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace System.Reactive.Linq { public partial class AsyncObservable { public static IAsyncObservable> ToLookup(this IAsyncObservable source, Func keySelector, Func valueSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); return CreateAsyncObservable>.From( source, (keySelector, valueSelector), static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, state.keySelector, state.valueSelector))); } public static IAsyncObservable> ToLookup(this IAsyncObservable source, Func keySelector, Func valueSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, valueSelector, comparer), static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, state.keySelector, state.valueSelector, state.comparer))); } public static IAsyncObservable> ToLookup(this IAsyncObservable source, Func> keySelector, Func> valueSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); return CreateAsyncObservable>.From( source, (keySelector, valueSelector), static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, state.keySelector, state.valueSelector))); } public static IAsyncObservable> ToLookup(this IAsyncObservable source, Func> keySelector, Func> valueSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return CreateAsyncObservable>.From( source, (keySelector, valueSelector, comparer), static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, state.keySelector, state.valueSelector, state.comparer))); } } public partial class AsyncObserver { public static IAsyncObserver ToLookup(IAsyncObserver> observer, Func keySelector, Func valueSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); return ToLookup(observer, keySelector, valueSelector, EqualityComparer.Default); } public static IAsyncObserver ToLookup(IAsyncObserver> observer, Func keySelector, Func valueSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Aggregate, ILookup>( observer, new Lookup(comparer), (d, x) => { var key = keySelector(x); var value = valueSelector(x); d.Add(key, value); return d; }, d => d ); } public static IAsyncObserver ToLookup(IAsyncObserver> observer, Func> keySelector, Func> valueSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); return ToLookup(observer, keySelector, valueSelector, EqualityComparer.Default); } public static IAsyncObserver ToLookup(IAsyncObserver> observer, Func> keySelector, Func> valueSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Aggregate, ILookup>( observer, new Lookup(comparer), async (d, x) => { var key = await keySelector(x).ConfigureAwait(false); var value = await valueSelector(x).ConfigureAwait(false); d.Add(key, value); return d; }, d => new ValueTask>(d) ); } } }