// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading.Tasks; namespace System.Reactive.Linq { partial class AsyncObservable { public static IAsyncObservable> ToDictionary(this IAsyncObservable source, Func keySelector, Func valueSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); return Create>(observer => source.SubscribeSafeAsync(AsyncObserver.ToDictionary(observer, keySelector, valueSelector))); } public static IAsyncObservable> ToDictionary(this IAsyncObservable source, Func keySelector, Func valueSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => source.SubscribeSafeAsync(AsyncObserver.ToDictionary(observer, keySelector, valueSelector, comparer))); } public static IAsyncObservable> ToDictionary(this IAsyncObservable source, Func> keySelector, Func> valueSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); return Create>(observer => source.SubscribeSafeAsync(AsyncObserver.ToDictionary(observer, keySelector, valueSelector))); } public static IAsyncObservable> ToDictionary(this IAsyncObservable source, Func> keySelector, Func> valueSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create>(observer => source.SubscribeSafeAsync(AsyncObserver.ToDictionary(observer, keySelector, valueSelector, comparer))); } } partial class AsyncObserver { public static IAsyncObserver ToDictionary(IAsyncObserver> observer, Func keySelector, Func valueSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); return ToDictionary(observer, keySelector, valueSelector, EqualityComparer.Default); } public static IAsyncObserver ToDictionary(IAsyncObserver> observer, Func keySelector, Func valueSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Aggregate, IDictionary>( observer, new Dictionary(comparer), (d, x) => { var key = keySelector(x); var value = valueSelector(x); d.Add(key, value); return d; }, d => d ); } public static IAsyncObserver ToDictionary(IAsyncObserver> observer, Func> keySelector, Func> valueSelector) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); return ToDictionary(observer, keySelector, valueSelector, EqualityComparer.Default); } public static IAsyncObserver ToDictionary(IAsyncObserver> observer, Func> keySelector, Func> valueSelector, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Aggregate, IDictionary>( observer, new Dictionary(comparer), async (d, x) => { var key = await keySelector(x).ConfigureAwait(false); var value = await valueSelector(x).ConfigureAwait(false); d.Add(key, value); return d; }, d => Task.FromResult>(d) ); } } }