// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Concurrency; using System.Threading.Tasks; namespace System.Reactive.Linq { public partial class AsyncObservable { public static IAsyncObservable Case(Func selector, IDictionary> sources) => Case(selector, sources, Empty()); public static IAsyncObservable Case(Func selector, IDictionary> sources, IAsyncScheduler scheduler) => Case(selector, sources, Empty(scheduler)); public static IAsyncObservable Case(Func selector, IDictionary> sources, IAsyncObservable defaultSource) { if (selector == null) throw new ArgumentNullException(nameof(selector)); if (sources == null) throw new ArgumentNullException(nameof(sources)); if (defaultSource == null) throw new ArgumentNullException(nameof(defaultSource)); return Create(observer => { var source = default(IAsyncObservable); try { var value = selector(); if (!sources.TryGetValue(value, out source)) { source = defaultSource; } } catch (Exception ex) { return Throw(ex).SubscribeAsync(observer); } return source.SubscribeSafeAsync(observer); }); } public static IAsyncObservable Case(Func> selector, IDictionary> sources) => Case(selector, sources, Empty()); public static IAsyncObservable Case(Func> selector, IDictionary> sources, IAsyncScheduler scheduler) => Case(selector, sources, Empty(scheduler)); public static IAsyncObservable Case(Func> selector, IDictionary> sources, IAsyncObservable defaultSource) { if (selector == null) throw new ArgumentNullException(nameof(selector)); if (sources == null) throw new ArgumentNullException(nameof(sources)); if (defaultSource == null) throw new ArgumentNullException(nameof(defaultSource)); return Create(async observer => { var source = default(IAsyncObservable); try { var value = await selector().ConfigureAwait(false); if (!sources.TryGetValue(value, out source)) { source = defaultSource; } } catch (Exception ex) { return await Throw(ex).SubscribeAsync(observer).ConfigureAwait(false); } return await source.SubscribeSafeAsync(observer).ConfigureAwait(false); }); } } }