DistinctUntilChanged.cs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading.Tasks;
  6. namespace System.Reactive.Linq
  7. {
  8. public partial class AsyncObservable
  9. {
  10. public static IAsyncObservable<TSource> DistinctUntilChanged<TSource>(IAsyncObservable<TSource> source)
  11. {
  12. if (source == null)
  13. throw new ArgumentNullException(nameof(source));
  14. return Create(source, static (source, observer) => source.SubscribeSafeAsync(AsyncObserver.DistinctUntilChanged(observer)));
  15. }
  16. public static IAsyncObservable<TSource> DistinctUntilChanged<TSource>(IAsyncObservable<TSource> source, IEqualityComparer<TSource> comparer)
  17. {
  18. if (source == null)
  19. throw new ArgumentNullException(nameof(source));
  20. if (comparer == null)
  21. throw new ArgumentNullException(nameof(comparer));
  22. return Create(
  23. source,
  24. comparer,
  25. static (source, comparer, observer) => source.SubscribeSafeAsync(AsyncObserver.DistinctUntilChanged(observer, comparer)));
  26. }
  27. public static IAsyncObservable<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector)
  28. {
  29. if (source == null)
  30. throw new ArgumentNullException(nameof(source));
  31. if (keySelector == null)
  32. throw new ArgumentNullException(nameof(keySelector));
  33. return Create(
  34. source,
  35. keySelector,
  36. static (source, keySelector, observer) => source.SubscribeSafeAsync(AsyncObserver.DistinctUntilChanged(observer, keySelector)));
  37. }
  38. public static IAsyncObservable<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector)
  39. {
  40. if (source == null)
  41. throw new ArgumentNullException(nameof(source));
  42. if (keySelector == null)
  43. throw new ArgumentNullException(nameof(keySelector));
  44. return Create(
  45. source,
  46. keySelector,
  47. static (source, keySelector, observer) => source.SubscribeSafeAsync(AsyncObserver.DistinctUntilChanged(observer, keySelector)));
  48. }
  49. public static IAsyncObservable<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  50. {
  51. if (source == null)
  52. throw new ArgumentNullException(nameof(source));
  53. if (keySelector == null)
  54. throw new ArgumentNullException(nameof(keySelector));
  55. if (comparer == null)
  56. throw new ArgumentNullException(nameof(comparer));
  57. return Create(
  58. source,
  59. (keySelector, comparer),
  60. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.DistinctUntilChanged(observer, state.keySelector, state.comparer)));
  61. }
  62. public static IAsyncObservable<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  63. {
  64. if (source == null)
  65. throw new ArgumentNullException(nameof(source));
  66. if (keySelector == null)
  67. throw new ArgumentNullException(nameof(keySelector));
  68. if (comparer == null)
  69. throw new ArgumentNullException(nameof(comparer));
  70. return Create(
  71. source,
  72. (keySelector, comparer),
  73. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.DistinctUntilChanged(observer, state.keySelector, state.comparer)));
  74. }
  75. }
  76. public partial class AsyncObserver
  77. {
  78. public static IAsyncObserver<TSource> DistinctUntilChanged<TSource>(IAsyncObserver<TSource> observer)
  79. {
  80. if (observer == null)
  81. throw new ArgumentNullException(nameof(observer));
  82. return DistinctUntilChanged(observer, x => x, EqualityComparer<TSource>.Default);
  83. }
  84. public static IAsyncObserver<TSource> DistinctUntilChanged<TSource>(IAsyncObserver<TSource> observer, IEqualityComparer<TSource> comparer)
  85. {
  86. if (observer == null)
  87. throw new ArgumentNullException(nameof(observer));
  88. if (comparer == null)
  89. throw new ArgumentNullException(nameof(comparer));
  90. return DistinctUntilChanged(observer, x => x, comparer);
  91. }
  92. public static IAsyncObserver<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObserver<TSource> observer, Func<TSource, TKey> keySelector)
  93. {
  94. if (observer == null)
  95. throw new ArgumentNullException(nameof(observer));
  96. if (keySelector == null)
  97. throw new ArgumentNullException(nameof(keySelector));
  98. return DistinctUntilChanged(observer, keySelector, EqualityComparer<TKey>.Default);
  99. }
  100. public static IAsyncObserver<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObserver<TSource> observer, Func<TSource, ValueTask<TKey>> keySelector)
  101. {
  102. if (observer == null)
  103. throw new ArgumentNullException(nameof(observer));
  104. if (keySelector == null)
  105. throw new ArgumentNullException(nameof(keySelector));
  106. return DistinctUntilChanged(observer, keySelector, EqualityComparer<TKey>.Default);
  107. }
  108. public static IAsyncObserver<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObserver<TSource> observer, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  109. {
  110. if (observer == null)
  111. throw new ArgumentNullException(nameof(observer));
  112. if (keySelector == null)
  113. throw new ArgumentNullException(nameof(keySelector));
  114. if (comparer == null)
  115. throw new ArgumentNullException(nameof(comparer));
  116. var hasCurrentKey = false;
  117. var currentKey = default(TKey);
  118. return Create<TSource>(
  119. async x =>
  120. {
  121. var key = default(TKey);
  122. try
  123. {
  124. key = keySelector(x);
  125. }
  126. catch (Exception ex)
  127. {
  128. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  129. return;
  130. }
  131. var equals = default(bool);
  132. if (hasCurrentKey)
  133. {
  134. try
  135. {
  136. equals = comparer.Equals(currentKey, key);
  137. }
  138. catch (Exception ex)
  139. {
  140. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  141. return;
  142. }
  143. }
  144. if (!hasCurrentKey || !equals)
  145. {
  146. hasCurrentKey = true;
  147. currentKey = key;
  148. await observer.OnNextAsync(x).ConfigureAwait(false);
  149. }
  150. },
  151. observer.OnErrorAsync,
  152. observer.OnCompletedAsync
  153. );
  154. }
  155. public static IAsyncObserver<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObserver<TSource> observer, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  156. {
  157. if (observer == null)
  158. throw new ArgumentNullException(nameof(observer));
  159. if (keySelector == null)
  160. throw new ArgumentNullException(nameof(keySelector));
  161. if (comparer == null)
  162. throw new ArgumentNullException(nameof(comparer));
  163. var hasCurrentKey = false;
  164. var currentKey = default(TKey);
  165. return Create<TSource>(
  166. async x =>
  167. {
  168. var key = default(TKey);
  169. try
  170. {
  171. key = await keySelector(x).ConfigureAwait(false);
  172. }
  173. catch (Exception ex)
  174. {
  175. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  176. return;
  177. }
  178. var equals = default(bool);
  179. if (hasCurrentKey)
  180. {
  181. try
  182. {
  183. equals = comparer.Equals(currentKey, key);
  184. }
  185. catch (Exception ex)
  186. {
  187. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  188. return;
  189. }
  190. }
  191. if (!hasCurrentKey || !equals)
  192. {
  193. hasCurrentKey = true;
  194. currentKey = key;
  195. await observer.OnNextAsync(x).ConfigureAwait(false);
  196. }
  197. },
  198. observer.OnErrorAsync,
  199. observer.OnCompletedAsync
  200. );
  201. }
  202. }
  203. }