ToDictionary.cs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading.Tasks;
  6. namespace System.Reactive.Linq
  7. {
  8. public partial class AsyncObservable
  9. {
  10. public static IAsyncObservable<IDictionary<TKey, TValue>> ToDictionary<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector)
  11. {
  12. if (source == null)
  13. throw new ArgumentNullException(nameof(source));
  14. if (keySelector == null)
  15. throw new ArgumentNullException(nameof(keySelector));
  16. if (valueSelector == null)
  17. throw new ArgumentNullException(nameof(valueSelector));
  18. return Create(
  19. source,
  20. (keySelector, valueSelector),
  21. default(IDictionary<TKey, TValue>),
  22. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToDictionary(observer, state.keySelector, state.valueSelector)));
  23. }
  24. public static IAsyncObservable<IDictionary<TKey, TValue>> ToDictionary<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector, IEqualityComparer<TKey> comparer)
  25. {
  26. if (source == null)
  27. throw new ArgumentNullException(nameof(source));
  28. if (keySelector == null)
  29. throw new ArgumentNullException(nameof(keySelector));
  30. if (valueSelector == null)
  31. throw new ArgumentNullException(nameof(valueSelector));
  32. if (comparer == null)
  33. throw new ArgumentNullException(nameof(comparer));
  34. return Create(
  35. source,
  36. (keySelector, valueSelector, comparer),
  37. default(IDictionary<TKey, TValue>),
  38. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToDictionary(observer, state.keySelector, state.valueSelector, state.comparer))); ;
  39. }
  40. public static IAsyncObservable<IDictionary<TKey, TValue>> ToDictionary<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TValue>> valueSelector)
  41. {
  42. if (source == null)
  43. throw new ArgumentNullException(nameof(source));
  44. if (keySelector == null)
  45. throw new ArgumentNullException(nameof(keySelector));
  46. if (valueSelector == null)
  47. throw new ArgumentNullException(nameof(valueSelector));
  48. return Create(
  49. source,
  50. (keySelector, valueSelector),
  51. default(IDictionary<TKey, TValue>),
  52. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToDictionary(observer, state.keySelector, state.valueSelector)));
  53. }
  54. public static IAsyncObservable<IDictionary<TKey, TValue>> ToDictionary<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TValue>> valueSelector, IEqualityComparer<TKey> comparer)
  55. {
  56. if (source == null)
  57. throw new ArgumentNullException(nameof(source));
  58. if (keySelector == null)
  59. throw new ArgumentNullException(nameof(keySelector));
  60. if (valueSelector == null)
  61. throw new ArgumentNullException(nameof(valueSelector));
  62. if (comparer == null)
  63. throw new ArgumentNullException(nameof(comparer));
  64. return Create(
  65. source,
  66. (keySelector, valueSelector, comparer),
  67. default(IDictionary<TKey, TValue>),
  68. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToDictionary(observer, state.keySelector, state.valueSelector, state.comparer)));
  69. }
  70. }
  71. public partial class AsyncObserver
  72. {
  73. public static IAsyncObserver<TSource> ToDictionary<TSource, TKey, TValue>(IAsyncObserver<IDictionary<TKey, TValue>> observer, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector)
  74. {
  75. if (observer == null)
  76. throw new ArgumentNullException(nameof(observer));
  77. if (keySelector == null)
  78. throw new ArgumentNullException(nameof(keySelector));
  79. if (valueSelector == null)
  80. throw new ArgumentNullException(nameof(valueSelector));
  81. return ToDictionary(observer, keySelector, valueSelector, EqualityComparer<TKey>.Default);
  82. }
  83. public static IAsyncObserver<TSource> ToDictionary<TSource, TKey, TValue>(IAsyncObserver<IDictionary<TKey, TValue>> observer, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector, IEqualityComparer<TKey> comparer)
  84. {
  85. if (observer == null)
  86. throw new ArgumentNullException(nameof(observer));
  87. if (keySelector == null)
  88. throw new ArgumentNullException(nameof(keySelector));
  89. if (valueSelector == null)
  90. throw new ArgumentNullException(nameof(valueSelector));
  91. if (comparer == null)
  92. throw new ArgumentNullException(nameof(comparer));
  93. return Aggregate<TSource, Dictionary<TKey, TValue>, IDictionary<TKey, TValue>>(
  94. observer,
  95. new Dictionary<TKey, TValue>(comparer),
  96. (d, x) =>
  97. {
  98. var key = keySelector(x);
  99. var value = valueSelector(x);
  100. d.Add(key, value);
  101. return d;
  102. },
  103. d => d
  104. );
  105. }
  106. public static IAsyncObserver<TSource> ToDictionary<TSource, TKey, TValue>(IAsyncObserver<IDictionary<TKey, TValue>> observer, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TValue>> valueSelector)
  107. {
  108. if (observer == null)
  109. throw new ArgumentNullException(nameof(observer));
  110. if (keySelector == null)
  111. throw new ArgumentNullException(nameof(keySelector));
  112. if (valueSelector == null)
  113. throw new ArgumentNullException(nameof(valueSelector));
  114. return ToDictionary(observer, keySelector, valueSelector, EqualityComparer<TKey>.Default);
  115. }
  116. public static IAsyncObserver<TSource> ToDictionary<TSource, TKey, TValue>(IAsyncObserver<IDictionary<TKey, TValue>> observer, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TValue>> valueSelector, IEqualityComparer<TKey> comparer)
  117. {
  118. if (observer == null)
  119. throw new ArgumentNullException(nameof(observer));
  120. if (keySelector == null)
  121. throw new ArgumentNullException(nameof(keySelector));
  122. if (valueSelector == null)
  123. throw new ArgumentNullException(nameof(valueSelector));
  124. if (comparer == null)
  125. throw new ArgumentNullException(nameof(comparer));
  126. return Aggregate<TSource, Dictionary<TKey, TValue>, IDictionary<TKey, TValue>>(
  127. observer,
  128. new Dictionary<TKey, TValue>(comparer),
  129. async (d, x) =>
  130. {
  131. var key = await keySelector(x).ConfigureAwait(false);
  132. var value = await valueSelector(x).ConfigureAwait(false);
  133. d.Add(key, value);
  134. return d;
  135. },
  136. d => new ValueTask<IDictionary<TKey, TValue>>(d)
  137. );
  138. }
  139. }
  140. }