Distinct.cs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerableEx
  10. {
  11. /// <summary>
  12. /// Returns an observable sequence that contains only distinct elements according to the keySelector.
  13. /// </summary>
  14. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  15. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  16. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  17. /// <param name="keySelector">A function to compute the comparison key for each element.</param>
  18. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  19. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  20. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  21. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
  22. {
  23. if (source == null)
  24. throw Error.ArgumentNull(nameof(source));
  25. if (keySelector == null)
  26. throw Error.ArgumentNull(nameof(keySelector));
  27. return DistinctCore(source, keySelector, comparer: null);
  28. }
  29. /// <summary>
  30. /// Returns an observable sequence that contains only distinct elements according to the keySelector and the comparer.
  31. /// </summary>
  32. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  33. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  34. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  35. /// <param name="keySelector">A function to compute the comparison key for each element.</param>
  36. /// <param name="comparer">Equality comparer for source elements.</param>
  37. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  38. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  39. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  40. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey>? comparer)
  41. {
  42. if (source == null)
  43. throw Error.ArgumentNull(nameof(source));
  44. if (keySelector == null)
  45. throw Error.ArgumentNull(nameof(keySelector));
  46. return DistinctCore(source, keySelector, comparer);
  47. }
  48. /// <summary>
  49. /// Returns an observable sequence that contains only distinct elements according to the asynchronous keySelector.
  50. /// </summary>
  51. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  52. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  53. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  54. /// <param name="keySelector">An asynchronous function to compute the comparison key for each element.</param>
  55. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  56. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  57. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  58. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector)
  59. {
  60. if (source == null)
  61. throw Error.ArgumentNull(nameof(source));
  62. if (keySelector == null)
  63. throw Error.ArgumentNull(nameof(keySelector));
  64. return DistinctCore<TSource, TKey>(source, keySelector, comparer: null);
  65. }
  66. #if !NO_DEEP_CANCELLATION
  67. /// <summary>
  68. /// Returns an observable sequence that contains only distinct elements according to the asynchronous (cancellable) keySelector.
  69. /// </summary>
  70. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  71. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  72. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  73. /// <param name="keySelector">An asynchronous (cancellable) function to compute the comparison key for each element.</param>
  74. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  75. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  76. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  77. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector)
  78. {
  79. if (source == null)
  80. throw Error.ArgumentNull(nameof(source));
  81. if (keySelector == null)
  82. throw Error.ArgumentNull(nameof(keySelector));
  83. return DistinctCore<TSource, TKey>(source, keySelector, comparer: null);
  84. }
  85. #endif
  86. /// <summary>
  87. /// Returns an observable sequence that contains only distinct elements according to the asynchronous keySelector and the comparer.
  88. /// </summary>
  89. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  90. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  91. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  92. /// <param name="keySelector">An asynchronous function to compute the comparison key for each element.</param>
  93. /// <param name="comparer">Equality comparer for source elements.</param>
  94. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  95. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  96. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  97. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer)
  98. {
  99. if (source == null)
  100. throw Error.ArgumentNull(nameof(source));
  101. if (keySelector == null)
  102. throw Error.ArgumentNull(nameof(keySelector));
  103. return DistinctCore(source, keySelector, comparer);
  104. }
  105. #if !NO_DEEP_CANCELLATION
  106. /// <summary>
  107. /// Returns an observable sequence that contains only distinct elements according to the asynchronous (cancellable) keySelector and the comparer.
  108. /// </summary>
  109. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  110. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  111. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  112. /// <param name="keySelector">An asynchronous (cancellable) function to compute the comparison key for each element.</param>
  113. /// <param name="comparer">Equality comparer for source elements.</param>
  114. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  115. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  116. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  117. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer)
  118. {
  119. if (source == null)
  120. throw Error.ArgumentNull(nameof(source));
  121. if (keySelector == null)
  122. throw Error.ArgumentNull(nameof(keySelector));
  123. return DistinctCore(source, keySelector, comparer);
  124. }
  125. #endif
  126. private static IAsyncEnumerable<TSource> DistinctCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey>? comparer)
  127. {
  128. return new DistinctAsyncIterator<TSource, TKey>(source, keySelector, comparer);
  129. }
  130. private static IAsyncEnumerable<TSource> DistinctCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer)
  131. {
  132. return new DistinctAsyncIteratorWithTask<TSource, TKey>(source, keySelector, comparer);
  133. }
  134. #if !NO_DEEP_CANCELLATION
  135. private static IAsyncEnumerable<TSource> DistinctCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer)
  136. {
  137. return new DistinctAsyncIteratorWithTaskAndCancellation<TSource, TKey>(source, keySelector, comparer);
  138. }
  139. #endif
  140. private sealed class DistinctAsyncIterator<TSource, TKey> : AsyncIterator<TSource>, IAsyncIListProvider<TSource>
  141. {
  142. private readonly IEqualityComparer<TKey>? _comparer;
  143. private readonly Func<TSource, TKey> _keySelector;
  144. private readonly IAsyncEnumerable<TSource> _source;
  145. private IAsyncEnumerator<TSource>? _enumerator;
  146. private Set<TKey>? _set;
  147. public DistinctAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey>? comparer)
  148. {
  149. _source = source;
  150. _keySelector = keySelector;
  151. _comparer = comparer;
  152. }
  153. public async ValueTask<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
  154. {
  155. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  156. return s.ToArray();
  157. }
  158. public async ValueTask<List<TSource>> ToListAsync(CancellationToken cancellationToken)
  159. {
  160. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  161. return s;
  162. }
  163. public async ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  164. {
  165. if (onlyIfCheap)
  166. {
  167. return -1;
  168. }
  169. var count = 0;
  170. var s = new Set<TKey>(_comparer);
  171. var enu = _source.GetAsyncEnumerator(cancellationToken);
  172. try
  173. {
  174. while (await enu.MoveNextAsync().ConfigureAwait(false))
  175. {
  176. var item = enu.Current;
  177. if (s.Add(_keySelector(item)))
  178. {
  179. count++;
  180. }
  181. }
  182. }
  183. finally
  184. {
  185. await enu.DisposeAsync().ConfigureAwait(false);
  186. }
  187. return count;
  188. }
  189. public override AsyncIteratorBase<TSource> Clone()
  190. {
  191. return new DistinctAsyncIterator<TSource, TKey>(_source, _keySelector, _comparer);
  192. }
  193. public override async ValueTask DisposeAsync()
  194. {
  195. if (_enumerator != null)
  196. {
  197. await _enumerator.DisposeAsync().ConfigureAwait(false);
  198. _enumerator = null;
  199. _set = null;
  200. }
  201. await base.DisposeAsync().ConfigureAwait(false);
  202. }
  203. protected override async ValueTask<bool> MoveNextCore()
  204. {
  205. switch (_state)
  206. {
  207. case AsyncIteratorState.Allocated:
  208. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  209. if (!await _enumerator.MoveNextAsync().ConfigureAwait(false))
  210. {
  211. await DisposeAsync().ConfigureAwait(false);
  212. return false;
  213. }
  214. var element = _enumerator.Current;
  215. _set = new Set<TKey>(_comparer);
  216. _set.Add(_keySelector(element));
  217. _current = element;
  218. _state = AsyncIteratorState.Iterating;
  219. return true;
  220. case AsyncIteratorState.Iterating:
  221. while (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
  222. {
  223. element = _enumerator.Current;
  224. if (_set!.Add(_keySelector(element)))
  225. {
  226. _current = element;
  227. return true;
  228. }
  229. }
  230. break;
  231. }
  232. await DisposeAsync().ConfigureAwait(false);
  233. return false;
  234. }
  235. private async Task<List<TSource>> FillSetAsync(CancellationToken cancellationToken)
  236. {
  237. var s = new Set<TKey>(_comparer);
  238. var r = new List<TSource>();
  239. var enu = _source.GetAsyncEnumerator(cancellationToken);
  240. try
  241. {
  242. while (await enu.MoveNextAsync().ConfigureAwait(false))
  243. {
  244. var item = enu.Current;
  245. if (s.Add(_keySelector(item)))
  246. {
  247. r.Add(item);
  248. }
  249. }
  250. }
  251. finally
  252. {
  253. await enu.DisposeAsync().ConfigureAwait(false);
  254. }
  255. return r;
  256. }
  257. }
  258. private sealed class DistinctAsyncIteratorWithTask<TSource, TKey> : AsyncIterator<TSource>, IAsyncIListProvider<TSource>
  259. {
  260. private readonly IEqualityComparer<TKey>? _comparer;
  261. private readonly Func<TSource, ValueTask<TKey>> _keySelector;
  262. private readonly IAsyncEnumerable<TSource> _source;
  263. private IAsyncEnumerator<TSource>? _enumerator;
  264. private Set<TKey>? _set;
  265. public DistinctAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer)
  266. {
  267. _source = source;
  268. _keySelector = keySelector;
  269. _comparer = comparer;
  270. }
  271. public async ValueTask<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
  272. {
  273. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  274. return s.ToArray();
  275. }
  276. public async ValueTask<List<TSource>> ToListAsync(CancellationToken cancellationToken)
  277. {
  278. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  279. return s;
  280. }
  281. public async ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  282. {
  283. if (onlyIfCheap)
  284. {
  285. return -1;
  286. }
  287. var count = 0;
  288. var s = new Set<TKey>(_comparer);
  289. var enu = _source.GetAsyncEnumerator(cancellationToken);
  290. try
  291. {
  292. while (await enu.MoveNextAsync().ConfigureAwait(false))
  293. {
  294. var item = enu.Current;
  295. if (s.Add(await _keySelector(item).ConfigureAwait(false)))
  296. {
  297. count++;
  298. }
  299. }
  300. }
  301. finally
  302. {
  303. await enu.DisposeAsync().ConfigureAwait(false);
  304. }
  305. return count;
  306. }
  307. public override AsyncIteratorBase<TSource> Clone()
  308. {
  309. return new DistinctAsyncIteratorWithTask<TSource, TKey>(_source, _keySelector, _comparer);
  310. }
  311. public override async ValueTask DisposeAsync()
  312. {
  313. if (_enumerator != null)
  314. {
  315. await _enumerator.DisposeAsync().ConfigureAwait(false);
  316. _enumerator = null;
  317. _set = null;
  318. }
  319. await base.DisposeAsync().ConfigureAwait(false);
  320. }
  321. protected override async ValueTask<bool> MoveNextCore()
  322. {
  323. switch (_state)
  324. {
  325. case AsyncIteratorState.Allocated:
  326. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  327. if (!await _enumerator.MoveNextAsync().ConfigureAwait(false))
  328. {
  329. await DisposeAsync().ConfigureAwait(false);
  330. return false;
  331. }
  332. var element = _enumerator.Current;
  333. _set = new Set<TKey>(_comparer);
  334. _set.Add(await _keySelector(element).ConfigureAwait(false));
  335. _current = element;
  336. _state = AsyncIteratorState.Iterating;
  337. return true;
  338. case AsyncIteratorState.Iterating:
  339. while (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
  340. {
  341. element = _enumerator.Current;
  342. if (_set!.Add(await _keySelector(element).ConfigureAwait(false)))
  343. {
  344. _current = element;
  345. return true;
  346. }
  347. }
  348. break;
  349. }
  350. await DisposeAsync().ConfigureAwait(false);
  351. return false;
  352. }
  353. private async ValueTask<List<TSource>> FillSetAsync(CancellationToken cancellationToken)
  354. {
  355. var s = new Set<TKey>(_comparer);
  356. var r = new List<TSource>();
  357. var enu = _source.GetAsyncEnumerator(cancellationToken);
  358. try
  359. {
  360. while (await enu.MoveNextAsync().ConfigureAwait(false))
  361. {
  362. var item = enu.Current;
  363. if (s.Add(await _keySelector(item).ConfigureAwait(false)))
  364. {
  365. r.Add(item);
  366. }
  367. }
  368. }
  369. finally
  370. {
  371. await enu.DisposeAsync().ConfigureAwait(false);
  372. }
  373. return r;
  374. }
  375. }
  376. #if !NO_DEEP_CANCELLATION
  377. private sealed class DistinctAsyncIteratorWithTaskAndCancellation<TSource, TKey> : AsyncIterator<TSource>, IAsyncIListProvider<TSource>
  378. {
  379. private readonly IEqualityComparer<TKey>? _comparer;
  380. private readonly Func<TSource, CancellationToken, ValueTask<TKey>> _keySelector;
  381. private readonly IAsyncEnumerable<TSource> _source;
  382. private IAsyncEnumerator<TSource>? _enumerator;
  383. private Set<TKey>? _set;
  384. public DistinctAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer)
  385. {
  386. _source = source;
  387. _keySelector = keySelector;
  388. _comparer = comparer;
  389. }
  390. public async ValueTask<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
  391. {
  392. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  393. return s.ToArray();
  394. }
  395. public async ValueTask<List<TSource>> ToListAsync(CancellationToken cancellationToken)
  396. {
  397. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  398. return s;
  399. }
  400. public async ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  401. {
  402. if (onlyIfCheap)
  403. {
  404. return -1;
  405. }
  406. var count = 0;
  407. var s = new Set<TKey>(_comparer);
  408. var enu = _source.GetAsyncEnumerator(cancellationToken);
  409. try
  410. {
  411. while (await enu.MoveNextAsync().ConfigureAwait(false))
  412. {
  413. var item = enu.Current;
  414. if (s.Add(await _keySelector(item, cancellationToken).ConfigureAwait(false)))
  415. {
  416. count++;
  417. }
  418. }
  419. }
  420. finally
  421. {
  422. await enu.DisposeAsync().ConfigureAwait(false);
  423. }
  424. return count;
  425. }
  426. public override AsyncIteratorBase<TSource> Clone()
  427. {
  428. return new DistinctAsyncIteratorWithTaskAndCancellation<TSource, TKey>(_source, _keySelector, _comparer);
  429. }
  430. public override async ValueTask DisposeAsync()
  431. {
  432. if (_enumerator != null)
  433. {
  434. await _enumerator.DisposeAsync().ConfigureAwait(false);
  435. _enumerator = null;
  436. _set = null;
  437. }
  438. await base.DisposeAsync().ConfigureAwait(false);
  439. }
  440. protected override async ValueTask<bool> MoveNextCore()
  441. {
  442. switch (_state)
  443. {
  444. case AsyncIteratorState.Allocated:
  445. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  446. if (!await _enumerator.MoveNextAsync().ConfigureAwait(false))
  447. {
  448. await DisposeAsync().ConfigureAwait(false);
  449. return false;
  450. }
  451. var element = _enumerator.Current;
  452. _set = new Set<TKey>(_comparer);
  453. _set.Add(await _keySelector(element, _cancellationToken).ConfigureAwait(false));
  454. _current = element;
  455. _state = AsyncIteratorState.Iterating;
  456. return true;
  457. case AsyncIteratorState.Iterating:
  458. while (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
  459. {
  460. element = _enumerator.Current;
  461. if (_set!.Add(await _keySelector(element, _cancellationToken).ConfigureAwait(false)))
  462. {
  463. _current = element;
  464. return true;
  465. }
  466. }
  467. break;
  468. }
  469. await DisposeAsync().ConfigureAwait(false);
  470. return false;
  471. }
  472. private async ValueTask<List<TSource>> FillSetAsync(CancellationToken cancellationToken)
  473. {
  474. var s = new Set<TKey>(_comparer);
  475. var r = new List<TSource>();
  476. var enu = _source.GetAsyncEnumerator(cancellationToken);
  477. try
  478. {
  479. while (await enu.MoveNextAsync().ConfigureAwait(false))
  480. {
  481. var item = enu.Current;
  482. if (s.Add(await _keySelector(item, cancellationToken).ConfigureAwait(false)))
  483. {
  484. r.Add(item);
  485. }
  486. }
  487. }
  488. finally
  489. {
  490. await enu.DisposeAsync().ConfigureAwait(false);
  491. }
  492. return r;
  493. }
  494. }
  495. #endif
  496. }
  497. }