DistinctUntilChanged.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerableEx
  11. {
  12. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource>(this IAsyncEnumerable<TSource> source)
  13. {
  14. if (source == null)
  15. throw new ArgumentNullException(nameof(source));
  16. return source.DistinctUntilChanged(EqualityComparer<TSource>.Default);
  17. }
  18. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource>(this IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
  19. {
  20. if (source == null)
  21. throw new ArgumentNullException(nameof(source));
  22. if (comparer == null)
  23. throw new ArgumentNullException(nameof(comparer));
  24. return new DistinctUntilChangedAsyncIterator<TSource>(source, comparer);
  25. }
  26. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
  27. {
  28. if (source == null)
  29. throw new ArgumentNullException(nameof(source));
  30. if (keySelector == null)
  31. throw new ArgumentNullException(nameof(keySelector));
  32. return source.DistinctUntilChanged_(keySelector, EqualityComparer<TKey>.Default);
  33. }
  34. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  35. {
  36. if (source == null)
  37. throw new ArgumentNullException(nameof(source));
  38. if (keySelector == null)
  39. throw new ArgumentNullException(nameof(keySelector));
  40. if (comparer == null)
  41. throw new ArgumentNullException(nameof(comparer));
  42. return source.DistinctUntilChanged_(keySelector, comparer);
  43. }
  44. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector)
  45. {
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. if (keySelector == null)
  49. throw new ArgumentNullException(nameof(keySelector));
  50. return source.DistinctUntilChanged_(keySelector, EqualityComparer<TKey>.Default);
  51. }
  52. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  53. {
  54. if (source == null)
  55. throw new ArgumentNullException(nameof(source));
  56. if (keySelector == null)
  57. throw new ArgumentNullException(nameof(keySelector));
  58. if (comparer == null)
  59. throw new ArgumentNullException(nameof(comparer));
  60. return source.DistinctUntilChanged_(keySelector, comparer);
  61. }
  62. private static IAsyncEnumerable<TSource> DistinctUntilChanged_<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  63. {
  64. return new DistinctUntilChangedAsyncIterator<TSource, TKey>(source, keySelector, comparer);
  65. }
  66. private static IAsyncEnumerable<TSource> DistinctUntilChanged_<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  67. {
  68. return new DistinctUntilChangedAsyncIteratorWithTask<TSource, TKey>(source, keySelector, comparer);
  69. }
  70. private sealed class DistinctUntilChangedAsyncIterator<TSource> : AsyncIterator<TSource>
  71. {
  72. private readonly IEqualityComparer<TSource> comparer;
  73. private readonly IAsyncEnumerable<TSource> source;
  74. private TSource currentValue;
  75. private IAsyncEnumerator<TSource> enumerator;
  76. private bool hasCurrentValue;
  77. public DistinctUntilChangedAsyncIterator(IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
  78. {
  79. Debug.Assert(comparer != null);
  80. Debug.Assert(source != null);
  81. this.source = source;
  82. this.comparer = comparer;
  83. }
  84. public override AsyncIterator<TSource> Clone()
  85. {
  86. return new DistinctUntilChangedAsyncIterator<TSource>(source, comparer);
  87. }
  88. public override async Task DisposeAsync()
  89. {
  90. if (enumerator != null)
  91. {
  92. await enumerator.DisposeAsync().ConfigureAwait(false);
  93. enumerator = null;
  94. currentValue = default(TSource);
  95. }
  96. await base.DisposeAsync().ConfigureAwait(false);
  97. }
  98. protected override async Task<bool> MoveNextCore()
  99. {
  100. switch (state)
  101. {
  102. case AsyncIteratorState.Allocated:
  103. enumerator = source.GetAsyncEnumerator();
  104. state = AsyncIteratorState.Iterating;
  105. goto case AsyncIteratorState.Iterating;
  106. case AsyncIteratorState.Iterating:
  107. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  108. {
  109. var item = enumerator.Current;
  110. var comparerEquals = false;
  111. if (hasCurrentValue)
  112. {
  113. comparerEquals = comparer.Equals(currentValue, item);
  114. }
  115. if (!hasCurrentValue || !comparerEquals)
  116. {
  117. hasCurrentValue = true;
  118. currentValue = item;
  119. current = item;
  120. return true;
  121. }
  122. }
  123. break;
  124. }
  125. await DisposeAsync().ConfigureAwait(false);
  126. return false;
  127. }
  128. }
  129. private sealed class DistinctUntilChangedAsyncIterator<TSource, TKey> : AsyncIterator<TSource>
  130. {
  131. private readonly IEqualityComparer<TKey> comparer;
  132. private readonly Func<TSource, TKey> keySelector;
  133. private readonly IAsyncEnumerable<TSource> source;
  134. private TKey currentKeyValue;
  135. private IAsyncEnumerator<TSource> enumerator;
  136. private bool hasCurrentKey;
  137. public DistinctUntilChangedAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  138. {
  139. this.source = source;
  140. this.keySelector = keySelector;
  141. this.comparer = comparer;
  142. }
  143. public override AsyncIterator<TSource> Clone()
  144. {
  145. return new DistinctUntilChangedAsyncIterator<TSource, TKey>(source, keySelector, comparer);
  146. }
  147. public override async Task DisposeAsync()
  148. {
  149. if (enumerator != null)
  150. {
  151. await enumerator.DisposeAsync().ConfigureAwait(false);
  152. enumerator = null;
  153. currentKeyValue = default(TKey);
  154. }
  155. await base.DisposeAsync().ConfigureAwait(false);
  156. }
  157. protected override async Task<bool> MoveNextCore()
  158. {
  159. switch (state)
  160. {
  161. case AsyncIteratorState.Allocated:
  162. enumerator = source.GetAsyncEnumerator();
  163. state = AsyncIteratorState.Iterating;
  164. goto case AsyncIteratorState.Iterating;
  165. case AsyncIteratorState.Iterating:
  166. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  167. {
  168. var item = enumerator.Current;
  169. var key = keySelector(item);
  170. var comparerEquals = false;
  171. if (hasCurrentKey)
  172. {
  173. comparerEquals = comparer.Equals(currentKeyValue, key);
  174. }
  175. if (!hasCurrentKey || !comparerEquals)
  176. {
  177. hasCurrentKey = true;
  178. currentKeyValue = key;
  179. current = item;
  180. return true;
  181. }
  182. }
  183. break; // case
  184. }
  185. await DisposeAsync().ConfigureAwait(false);
  186. return false;
  187. }
  188. }
  189. private sealed class DistinctUntilChangedAsyncIteratorWithTask<TSource, TKey> : AsyncIterator<TSource>
  190. {
  191. private readonly IEqualityComparer<TKey> comparer;
  192. private readonly Func<TSource, Task<TKey>> keySelector;
  193. private readonly IAsyncEnumerable<TSource> source;
  194. private TKey currentKeyValue;
  195. private IAsyncEnumerator<TSource> enumerator;
  196. private bool hasCurrentKey;
  197. public DistinctUntilChangedAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  198. {
  199. this.source = source;
  200. this.keySelector = keySelector;
  201. this.comparer = comparer;
  202. }
  203. public override AsyncIterator<TSource> Clone()
  204. {
  205. return new DistinctUntilChangedAsyncIteratorWithTask<TSource, TKey>(source, keySelector, comparer);
  206. }
  207. public override async Task DisposeAsync()
  208. {
  209. if (enumerator != null)
  210. {
  211. await enumerator.DisposeAsync().ConfigureAwait(false);
  212. enumerator = null;
  213. currentKeyValue = default(TKey);
  214. }
  215. await base.DisposeAsync().ConfigureAwait(false);
  216. }
  217. protected override async Task<bool> MoveNextCore()
  218. {
  219. switch (state)
  220. {
  221. case AsyncIteratorState.Allocated:
  222. enumerator = source.GetAsyncEnumerator();
  223. state = AsyncIteratorState.Iterating;
  224. goto case AsyncIteratorState.Iterating;
  225. case AsyncIteratorState.Iterating:
  226. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  227. {
  228. var item = enumerator.Current;
  229. var key = await keySelector(item).ConfigureAwait(false);
  230. var comparerEquals = false;
  231. if (hasCurrentKey)
  232. {
  233. comparerEquals = comparer.Equals(currentKeyValue, key);
  234. }
  235. if (!hasCurrentKey || !comparerEquals)
  236. {
  237. hasCurrentKey = true;
  238. currentKeyValue = key;
  239. current = item;
  240. return true;
  241. }
  242. }
  243. break; // case
  244. }
  245. await DisposeAsync().ConfigureAwait(false);
  246. return false;
  247. }
  248. }
  249. }
  250. }