Distinct.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerableEx
  11. {
  12. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
  13. {
  14. if (source == null)
  15. throw Error.ArgumentNull(nameof(source));
  16. if (keySelector == null)
  17. throw Error.ArgumentNull(nameof(keySelector));
  18. return DistinctCore(source, keySelector, comparer: null);
  19. }
  20. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  21. {
  22. if (source == null)
  23. throw Error.ArgumentNull(nameof(source));
  24. if (keySelector == null)
  25. throw Error.ArgumentNull(nameof(keySelector));
  26. return DistinctCore(source, keySelector, comparer);
  27. }
  28. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector)
  29. {
  30. if (source == null)
  31. throw Error.ArgumentNull(nameof(source));
  32. if (keySelector == null)
  33. throw Error.ArgumentNull(nameof(keySelector));
  34. return DistinctCore<TSource, TKey>(source, keySelector, comparer: null);
  35. }
  36. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  37. {
  38. if (source == null)
  39. throw Error.ArgumentNull(nameof(source));
  40. if (keySelector == null)
  41. throw Error.ArgumentNull(nameof(keySelector));
  42. if (comparer == null)
  43. throw Error.ArgumentNull(nameof(comparer));
  44. return DistinctCore(source, keySelector, comparer);
  45. }
  46. private static IAsyncEnumerable<TSource> DistinctCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  47. {
  48. return new DistinctAsyncIterator<TSource, TKey>(source, keySelector, comparer);
  49. }
  50. private static IAsyncEnumerable<TSource> DistinctCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  51. {
  52. return new DistinctAsyncIteratorWithTask<TSource, TKey>(source, keySelector, comparer);
  53. }
  54. private sealed class DistinctAsyncIterator<TSource, TKey> : AsyncIterator<TSource>, IAsyncIListProvider<TSource>
  55. {
  56. private readonly IEqualityComparer<TKey> _comparer;
  57. private readonly Func<TSource, TKey> _keySelector;
  58. private readonly IAsyncEnumerable<TSource> _source;
  59. private IAsyncEnumerator<TSource> _enumerator;
  60. private Set<TKey> _set;
  61. public DistinctAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  62. {
  63. Debug.Assert(source != null);
  64. Debug.Assert(keySelector != null);
  65. _source = source;
  66. _keySelector = keySelector;
  67. _comparer = comparer;
  68. }
  69. public async Task<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
  70. {
  71. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  72. return s.ToArray();
  73. }
  74. public async Task<List<TSource>> ToListAsync(CancellationToken cancellationToken)
  75. {
  76. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  77. return s;
  78. }
  79. public async Task<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  80. {
  81. if (onlyIfCheap)
  82. {
  83. return -1;
  84. }
  85. var count = 0;
  86. var s = new Set<TKey>(_comparer);
  87. var enu = _source.GetAsyncEnumerator(cancellationToken);
  88. try
  89. {
  90. while (await enu.MoveNextAsync().ConfigureAwait(false))
  91. {
  92. var item = enu.Current;
  93. if (s.Add(_keySelector(item)))
  94. {
  95. count++;
  96. }
  97. }
  98. }
  99. finally
  100. {
  101. await enu.DisposeAsync().ConfigureAwait(false);
  102. }
  103. return count;
  104. }
  105. public override AsyncIterator<TSource> Clone()
  106. {
  107. return new DistinctAsyncIterator<TSource, TKey>(_source, _keySelector, _comparer);
  108. }
  109. public override async ValueTask DisposeAsync()
  110. {
  111. if (_enumerator != null)
  112. {
  113. await _enumerator.DisposeAsync().ConfigureAwait(false);
  114. _enumerator = null;
  115. _set = null;
  116. }
  117. await base.DisposeAsync().ConfigureAwait(false);
  118. }
  119. protected override async ValueTask<bool> MoveNextCore()
  120. {
  121. switch (state)
  122. {
  123. case AsyncIteratorState.Allocated:
  124. _enumerator = _source.GetAsyncEnumerator(cancellationToken);
  125. if (!await _enumerator.MoveNextAsync().ConfigureAwait(false))
  126. {
  127. await DisposeAsync().ConfigureAwait(false);
  128. return false;
  129. }
  130. var element = _enumerator.Current;
  131. _set = new Set<TKey>(_comparer);
  132. _set.Add(_keySelector(element));
  133. current = element;
  134. state = AsyncIteratorState.Iterating;
  135. return true;
  136. case AsyncIteratorState.Iterating:
  137. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  138. {
  139. element = _enumerator.Current;
  140. if (_set.Add(_keySelector(element)))
  141. {
  142. current = element;
  143. return true;
  144. }
  145. }
  146. break;
  147. }
  148. await DisposeAsync().ConfigureAwait(false);
  149. return false;
  150. }
  151. private async Task<List<TSource>> FillSetAsync(CancellationToken cancellationToken)
  152. {
  153. var s = new Set<TKey>(_comparer);
  154. var r = new List<TSource>();
  155. var enu = _source.GetAsyncEnumerator(cancellationToken);
  156. try
  157. {
  158. while (await enu.MoveNextAsync().ConfigureAwait(false))
  159. {
  160. var item = enu.Current;
  161. if (s.Add(_keySelector(item)))
  162. {
  163. r.Add(item);
  164. }
  165. }
  166. }
  167. finally
  168. {
  169. await enu.DisposeAsync().ConfigureAwait(false);
  170. }
  171. return r;
  172. }
  173. }
  174. private sealed class DistinctAsyncIteratorWithTask<TSource, TKey> : AsyncIterator<TSource>, IAsyncIListProvider<TSource>
  175. {
  176. private readonly IEqualityComparer<TKey> _comparer;
  177. private readonly Func<TSource, Task<TKey>> _keySelector;
  178. private readonly IAsyncEnumerable<TSource> _source;
  179. private IAsyncEnumerator<TSource> _enumerator;
  180. private Set<TKey> _set;
  181. public DistinctAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  182. {
  183. Debug.Assert(source != null);
  184. Debug.Assert(keySelector != null);
  185. _source = source;
  186. _keySelector = keySelector;
  187. _comparer = comparer;
  188. }
  189. public async Task<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
  190. {
  191. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  192. return s.ToArray();
  193. }
  194. public async Task<List<TSource>> ToListAsync(CancellationToken cancellationToken)
  195. {
  196. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  197. return s;
  198. }
  199. public async Task<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  200. {
  201. if (onlyIfCheap)
  202. {
  203. return -1;
  204. }
  205. var count = 0;
  206. var s = new Set<TKey>(_comparer);
  207. var enu = _source.GetAsyncEnumerator(cancellationToken);
  208. try
  209. {
  210. while (await enu.MoveNextAsync().ConfigureAwait(false))
  211. {
  212. var item = enu.Current;
  213. if (s.Add(await _keySelector(item).ConfigureAwait(false)))
  214. {
  215. count++;
  216. }
  217. }
  218. }
  219. finally
  220. {
  221. await enu.DisposeAsync().ConfigureAwait(false);
  222. }
  223. return count;
  224. }
  225. public override AsyncIterator<TSource> Clone()
  226. {
  227. return new DistinctAsyncIteratorWithTask<TSource, TKey>(_source, _keySelector, _comparer);
  228. }
  229. public override async ValueTask DisposeAsync()
  230. {
  231. if (_enumerator != null)
  232. {
  233. await _enumerator.DisposeAsync().ConfigureAwait(false);
  234. _enumerator = null;
  235. _set = null;
  236. }
  237. await base.DisposeAsync().ConfigureAwait(false);
  238. }
  239. protected override async ValueTask<bool> MoveNextCore()
  240. {
  241. switch (state)
  242. {
  243. case AsyncIteratorState.Allocated:
  244. _enumerator = _source.GetAsyncEnumerator(cancellationToken);
  245. if (!await _enumerator.MoveNextAsync().ConfigureAwait(false))
  246. {
  247. await DisposeAsync().ConfigureAwait(false);
  248. return false;
  249. }
  250. var element = _enumerator.Current;
  251. _set = new Set<TKey>(_comparer);
  252. _set.Add(await _keySelector(element).ConfigureAwait(false));
  253. current = element;
  254. state = AsyncIteratorState.Iterating;
  255. return true;
  256. case AsyncIteratorState.Iterating:
  257. while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  258. {
  259. element = _enumerator.Current;
  260. if (_set.Add(await _keySelector(element).ConfigureAwait(false)))
  261. {
  262. current = element;
  263. return true;
  264. }
  265. }
  266. break;
  267. }
  268. await DisposeAsync().ConfigureAwait(false);
  269. return false;
  270. }
  271. private async Task<List<TSource>> FillSetAsync(CancellationToken cancellationToken)
  272. {
  273. var s = new Set<TKey>(_comparer);
  274. var r = new List<TSource>();
  275. var enu = _source.GetAsyncEnumerator(cancellationToken);
  276. try
  277. {
  278. while (await enu.MoveNextAsync().ConfigureAwait(false))
  279. {
  280. var item = enu.Current;
  281. if (s.Add(await _keySelector(item).ConfigureAwait(false)))
  282. {
  283. r.Add(item);
  284. }
  285. }
  286. }
  287. finally
  288. {
  289. await enu.DisposeAsync().ConfigureAwait(false);
  290. }
  291. return r;
  292. }
  293. }
  294. }
  295. }