Case.cs 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. public partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IAsyncObservable<TResult>> sources) => Case(selector, sources, Empty<TResult>());
  12. public static IAsyncObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IAsyncObservable<TResult>> sources, IAsyncScheduler scheduler) => Case(selector, sources, Empty<TResult>(scheduler));
  13. public static IAsyncObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IAsyncObservable<TResult>> sources, IAsyncObservable<TResult> defaultSource)
  14. {
  15. if (selector == null)
  16. throw new ArgumentNullException(nameof(selector));
  17. if (sources == null)
  18. throw new ArgumentNullException(nameof(sources));
  19. if (defaultSource == null)
  20. throw new ArgumentNullException(nameof(defaultSource));
  21. return Create<TResult>(observer =>
  22. {
  23. var source = default(IAsyncObservable<TResult>);
  24. try
  25. {
  26. var value = selector();
  27. if (!sources.TryGetValue(value, out source))
  28. {
  29. source = defaultSource;
  30. }
  31. }
  32. catch (Exception ex)
  33. {
  34. return Throw<TResult>(ex).SubscribeAsync(observer);
  35. }
  36. return source.SubscribeSafeAsync(observer);
  37. });
  38. }
  39. public static IAsyncObservable<TResult> Case<TValue, TResult>(Func<ValueTask<TValue>> selector, IDictionary<TValue, IAsyncObservable<TResult>> sources) => Case(selector, sources, Empty<TResult>());
  40. public static IAsyncObservable<TResult> Case<TValue, TResult>(Func<ValueTask<TValue>> selector, IDictionary<TValue, IAsyncObservable<TResult>> sources, IAsyncScheduler scheduler) => Case(selector, sources, Empty<TResult>(scheduler));
  41. public static IAsyncObservable<TResult> Case<TValue, TResult>(Func<ValueTask<TValue>> selector, IDictionary<TValue, IAsyncObservable<TResult>> sources, IAsyncObservable<TResult> defaultSource)
  42. {
  43. if (selector == null)
  44. throw new ArgumentNullException(nameof(selector));
  45. if (sources == null)
  46. throw new ArgumentNullException(nameof(sources));
  47. if (defaultSource == null)
  48. throw new ArgumentNullException(nameof(defaultSource));
  49. return Create<TResult>(async observer =>
  50. {
  51. var source = default(IAsyncObservable<TResult>);
  52. try
  53. {
  54. var value = await selector().ConfigureAwait(false);
  55. if (!sources.TryGetValue(value, out source))
  56. {
  57. source = defaultSource;
  58. }
  59. }
  60. catch (Exception ex)
  61. {
  62. return await Throw<TResult>(ex).SubscribeAsync(observer).ConfigureAwait(false);
  63. }
  64. return await source.SubscribeSafeAsync(observer).ConfigureAwait(false);
  65. });
  66. }
  67. }
  68. }