ToLookup.cs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. if (keySelector == null)
  16. throw new ArgumentNullException(nameof(keySelector));
  17. if (valueSelector == null)
  18. throw new ArgumentNullException(nameof(valueSelector));
  19. return Create<ILookup<TKey, TValue>>(observer => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, keySelector, valueSelector)));
  20. }
  21. public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector, IEqualityComparer<TKey> comparer)
  22. {
  23. if (source == null)
  24. throw new ArgumentNullException(nameof(source));
  25. if (keySelector == null)
  26. throw new ArgumentNullException(nameof(keySelector));
  27. if (valueSelector == null)
  28. throw new ArgumentNullException(nameof(valueSelector));
  29. if (comparer == null)
  30. throw new ArgumentNullException(nameof(comparer));
  31. return Create<ILookup<TKey, TValue>>(observer => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, keySelector, valueSelector, comparer)));
  32. }
  33. public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TValue>> valueSelector)
  34. {
  35. if (source == null)
  36. throw new ArgumentNullException(nameof(source));
  37. if (keySelector == null)
  38. throw new ArgumentNullException(nameof(keySelector));
  39. if (valueSelector == null)
  40. throw new ArgumentNullException(nameof(valueSelector));
  41. return Create<ILookup<TKey, TValue>>(observer => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, keySelector, valueSelector)));
  42. }
  43. public static IAsyncObservable<ILookup<TKey, TValue>> ToLookup<TSource, TKey, TValue>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TValue>> valueSelector, IEqualityComparer<TKey> comparer)
  44. {
  45. if (source == null)
  46. throw new ArgumentNullException(nameof(source));
  47. if (keySelector == null)
  48. throw new ArgumentNullException(nameof(keySelector));
  49. if (valueSelector == null)
  50. throw new ArgumentNullException(nameof(valueSelector));
  51. if (comparer == null)
  52. throw new ArgumentNullException(nameof(comparer));
  53. return Create<ILookup<TKey, TValue>>(observer => source.SubscribeSafeAsync(AsyncObserver.ToLookup(observer, keySelector, valueSelector, comparer)));
  54. }
  55. }
  56. partial class AsyncObserver
  57. {
  58. public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector)
  59. {
  60. if (observer == null)
  61. throw new ArgumentNullException(nameof(observer));
  62. if (keySelector == null)
  63. throw new ArgumentNullException(nameof(keySelector));
  64. if (valueSelector == null)
  65. throw new ArgumentNullException(nameof(valueSelector));
  66. return ToLookup(observer, keySelector, valueSelector, EqualityComparer<TKey>.Default);
  67. }
  68. public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, TKey> keySelector, Func<TSource, TValue> valueSelector, IEqualityComparer<TKey> comparer)
  69. {
  70. if (observer == null)
  71. throw new ArgumentNullException(nameof(observer));
  72. if (keySelector == null)
  73. throw new ArgumentNullException(nameof(keySelector));
  74. if (valueSelector == null)
  75. throw new ArgumentNullException(nameof(valueSelector));
  76. if (comparer == null)
  77. throw new ArgumentNullException(nameof(comparer));
  78. return Aggregate<TSource, Lookup<TKey, TValue>, ILookup<TKey, TValue>>(
  79. observer,
  80. new Lookup<TKey, TValue>(comparer),
  81. (d, x) =>
  82. {
  83. var key = keySelector(x);
  84. var value = valueSelector(x);
  85. d.Add(key, value);
  86. return d;
  87. },
  88. d => d
  89. );
  90. }
  91. public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TValue>> valueSelector)
  92. {
  93. if (observer == null)
  94. throw new ArgumentNullException(nameof(observer));
  95. if (keySelector == null)
  96. throw new ArgumentNullException(nameof(keySelector));
  97. if (valueSelector == null)
  98. throw new ArgumentNullException(nameof(valueSelector));
  99. return ToLookup(observer, keySelector, valueSelector, EqualityComparer<TKey>.Default);
  100. }
  101. public static IAsyncObserver<TSource> ToLookup<TSource, TKey, TValue>(IAsyncObserver<ILookup<TKey, TValue>> observer, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TValue>> valueSelector, IEqualityComparer<TKey> comparer)
  102. {
  103. if (observer == null)
  104. throw new ArgumentNullException(nameof(observer));
  105. if (keySelector == null)
  106. throw new ArgumentNullException(nameof(keySelector));
  107. if (valueSelector == null)
  108. throw new ArgumentNullException(nameof(valueSelector));
  109. if (comparer == null)
  110. throw new ArgumentNullException(nameof(comparer));
  111. return Aggregate<TSource, Lookup<TKey, TValue>, ILookup<TKey, TValue>>(
  112. observer,
  113. new Lookup<TKey, TValue>(comparer),
  114. async (d, x) =>
  115. {
  116. var key = await keySelector(x).ConfigureAwait(false);
  117. var value = await valueSelector(x).ConfigureAwait(false);
  118. d.Add(key, value);
  119. return d;
  120. },
  121. d => Task.FromResult<ILookup<TKey, TValue>>(d)
  122. );
  123. }
  124. }
  125. }