ToLookup.cs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. public partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. if (keySelector == null)
  16. throw new ArgumentNullException(nameof(keySelector));
  17. if (valueSelector == null)
  18. throw new ArgumentNullException(nameof(valueSelector));
  19. return CreateAsyncObservable<ILookup<TKey, TValue>>.From(
  20. source,
  21. (keySelector, valueSelector),
  22. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, state.keySelector, state.valueSelector)));
  23. }
  24. public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector, IEqualityComparer<TKey> comparer)
  25. {
  26. if (source == null)
  27. throw new ArgumentNullException(nameof(source));
  28. if (keySelector == null)
  29. throw new ArgumentNullException(nameof(keySelector));
  30. if (valueSelector == null)
  31. throw new ArgumentNullException(nameof(valueSelector));
  32. if (comparer == null)
  33. throw new ArgumentNullException(nameof(comparer));
  34. return CreateAsyncObservable<ILookup<TKey, TValue>>.From(
  35. source,
  36. (keySelector, valueSelector, comparer),
  37. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, state.keySelector, state.valueSelector, state.comparer)));
  38. }
  39. public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TValue>> valueSelector)
  40. {
  41. if (source == null)
  42. throw new ArgumentNullException(nameof(source));
  43. if (keySelector == null)
  44. throw new ArgumentNullException(nameof(keySelector));
  45. if (valueSelector == null)
  46. throw new ArgumentNullException(nameof(valueSelector));
  47. return CreateAsyncObservable<ILookup<TKey, TValue>>.From(
  48. source,
  49. (keySelector, valueSelector),
  50. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, state.keySelector, state.valueSelector)));
  51. }
  52. public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TValue>> valueSelector, IEqualityComparer<TKey> comparer)
  53. {
  54. if (source == null)
  55. throw new ArgumentNullException(nameof(source));
  56. if (keySelector == null)
  57. throw new ArgumentNullException(nameof(keySelector));
  58. if (valueSelector == null)
  59. throw new ArgumentNullException(nameof(valueSelector));
  60. if (comparer == null)
  61. throw new ArgumentNullException(nameof(comparer));
  62. return CreateAsyncObservable<ILookup<TKey, TValue>>.From(
  63. source,
  64. (keySelector, valueSelector, comparer),
  65. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, state.keySelector, state.valueSelector, state.comparer)));
  66. }
  67. }
  68. public partial class AsyncObserver
  69. {
  70. public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector)
  71. {
  72. if (observer == null)
  73. throw new ArgumentNullException(nameof(observer));
  74. if (keySelector == null)
  75. throw new ArgumentNullException(nameof(keySelector));
  76. if (valueSelector == null)
  77. throw new ArgumentNullException(nameof(valueSelector));
  78. return ToLookup(observer, keySelector, valueSelector, EqualityComparer<TKey>.Default);
  79. }
  80. public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector, IEqualityComparer<TKey> comparer)
  81. {
  82. if (observer == null)
  83. throw new ArgumentNullException(nameof(observer));
  84. if (keySelector == null)
  85. throw new ArgumentNullException(nameof(keySelector));
  86. if (valueSelector == null)
  87. throw new ArgumentNullException(nameof(valueSelector));
  88. if (comparer == null)
  89. throw new ArgumentNullException(nameof(comparer));
  90. return Aggregate<TSource, Lookup<TKey, TValue>, ILookup<TKey, TValue>>(
  91. observer,
  92. new Lookup<TKey, TValue>(comparer),
  93. (d, x) =>
  94. {
  95. var key = keySelector(x);
  96. var value = valueSelector(x);
  97. d.Add(key, value);
  98. return d;
  99. },
  100. d => d
  101. );
  102. }
  103. public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TValue>> valueSelector)
  104. {
  105. if (observer == null)
  106. throw new ArgumentNullException(nameof(observer));
  107. if (keySelector == null)
  108. throw new ArgumentNullException(nameof(keySelector));
  109. if (valueSelector == null)
  110. throw new ArgumentNullException(nameof(valueSelector));
  111. return ToLookup(observer, keySelector, valueSelector, EqualityComparer<TKey>.Default);
  112. }
  113. public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TValue>> valueSelector, IEqualityComparer<TKey> comparer)
  114. {
  115. if (observer == null)
  116. throw new ArgumentNullException(nameof(observer));
  117. if (keySelector == null)
  118. throw new ArgumentNullException(nameof(keySelector));
  119. if (valueSelector == null)
  120. throw new ArgumentNullException(nameof(valueSelector));
  121. if (comparer == null)
  122. throw new ArgumentNullException(nameof(comparer));
  123. return Aggregate<TSource, Lookup<TKey, TValue>, ILookup<TKey, TValue>>(
  124. observer,
  125. new Lookup<TKey, TValue>(comparer),
  126. async (d, x) =>
  127. {
  128. var key = await keySelector(x).ConfigureAwait(false);
  129. var value = await valueSelector(x).ConfigureAwait(false);
  130. d.Add(key, value);
  131. return d;
  132. },
  133. d => new ValueTask<ILookup<TKey, TValue>>(d)
  134. );
  135. }
  136. }
  137. }