Distinct.cs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TSource> Distinct<TSource>(this IAsyncEnumerable<TSource> source)
  13. {
  14. if (source == null)
  15. throw new ArgumentNullException(nameof(source));
  16. return source.Distinct(EqualityComparer<TSource>.Default);
  17. }
  18. public static IAsyncEnumerable<TSource> Distinct<TSource>(this IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
  19. {
  20. if (source == null)
  21. throw new ArgumentNullException(nameof(source));
  22. if (comparer == null)
  23. throw new ArgumentNullException(nameof(comparer));
  24. return new DistinctAsyncIterator<TSource>(source, comparer);
  25. }
  26. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
  27. {
  28. if (source == null)
  29. throw new ArgumentNullException(nameof(source));
  30. if (keySelector == null)
  31. throw new ArgumentNullException(nameof(keySelector));
  32. return source.Distinct(keySelector, EqualityComparer<TKey>.Default);
  33. }
  34. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  35. {
  36. if (source == null)
  37. throw new ArgumentNullException(nameof(source));
  38. if (keySelector == null)
  39. throw new ArgumentNullException(nameof(keySelector));
  40. if (comparer == null)
  41. throw new ArgumentNullException(nameof(comparer));
  42. return new DistinctAsyncIterator<TSource, TKey>(source, keySelector, comparer);
  43. }
  44. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector)
  45. {
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. if (keySelector == null)
  49. throw new ArgumentNullException(nameof(keySelector));
  50. return source.Distinct(keySelector, EqualityComparer<TKey>.Default);
  51. }
  52. public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  53. {
  54. if (source == null)
  55. throw new ArgumentNullException(nameof(source));
  56. if (keySelector == null)
  57. throw new ArgumentNullException(nameof(keySelector));
  58. if (comparer == null)
  59. throw new ArgumentNullException(nameof(comparer));
  60. return new DistinctAsyncIteratorWithTask<TSource, TKey>(source, keySelector, comparer);
  61. }
  62. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource>(this IAsyncEnumerable<TSource> source)
  63. {
  64. if (source == null)
  65. throw new ArgumentNullException(nameof(source));
  66. return source.DistinctUntilChanged(EqualityComparer<TSource>.Default);
  67. }
  68. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource>(this IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
  69. {
  70. if (source == null)
  71. throw new ArgumentNullException(nameof(source));
  72. if (comparer == null)
  73. throw new ArgumentNullException(nameof(comparer));
  74. return new DistinctUntilChangedAsyncIterator<TSource>(source, comparer);
  75. }
  76. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
  77. {
  78. if (source == null)
  79. throw new ArgumentNullException(nameof(source));
  80. if (keySelector == null)
  81. throw new ArgumentNullException(nameof(keySelector));
  82. return source.DistinctUntilChanged_(keySelector, EqualityComparer<TKey>.Default);
  83. }
  84. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  85. {
  86. if (source == null)
  87. throw new ArgumentNullException(nameof(source));
  88. if (keySelector == null)
  89. throw new ArgumentNullException(nameof(keySelector));
  90. if (comparer == null)
  91. throw new ArgumentNullException(nameof(comparer));
  92. return source.DistinctUntilChanged_(keySelector, comparer);
  93. }
  94. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector)
  95. {
  96. if (source == null)
  97. throw new ArgumentNullException(nameof(source));
  98. if (keySelector == null)
  99. throw new ArgumentNullException(nameof(keySelector));
  100. return source.DistinctUntilChanged_(keySelector, EqualityComparer<TKey>.Default);
  101. }
  102. public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  103. {
  104. if (source == null)
  105. throw new ArgumentNullException(nameof(source));
  106. if (keySelector == null)
  107. throw new ArgumentNullException(nameof(keySelector));
  108. if (comparer == null)
  109. throw new ArgumentNullException(nameof(comparer));
  110. return source.DistinctUntilChanged_(keySelector, comparer);
  111. }
  112. private static IAsyncEnumerable<TSource> DistinctUntilChanged_<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  113. {
  114. return new DistinctUntilChangedAsyncIterator<TSource, TKey>(source, keySelector, comparer);
  115. }
  116. private static IAsyncEnumerable<TSource> DistinctUntilChanged_<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  117. {
  118. return new DistinctUntilChangedAsyncIteratorWithTask<TSource, TKey>(source, keySelector, comparer);
  119. }
  120. private sealed class DistinctAsyncIterator<TSource> : AsyncIterator<TSource>, IIListProvider<TSource>
  121. {
  122. private readonly IEqualityComparer<TSource> comparer;
  123. private readonly IAsyncEnumerable<TSource> source;
  124. private IAsyncEnumerator<TSource> enumerator;
  125. private Set<TSource> set;
  126. public DistinctAsyncIterator(IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
  127. {
  128. Debug.Assert(source != null);
  129. this.source = source;
  130. this.comparer = comparer;
  131. }
  132. public async Task<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
  133. {
  134. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  135. return s.ToArray();
  136. }
  137. public async Task<List<TSource>> ToListAsync(CancellationToken cancellationToken)
  138. {
  139. var s = await FillSetAsync(cancellationToken)
  140. .ConfigureAwait(false);
  141. return s.ToList();
  142. }
  143. public async Task<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  144. {
  145. return onlyIfCheap ? -1 : (await FillSetAsync(cancellationToken).ConfigureAwait(false)).Count;
  146. }
  147. public override AsyncIterator<TSource> Clone()
  148. {
  149. return new DistinctAsyncIterator<TSource>(source, comparer);
  150. }
  151. public override async Task DisposeAsync()
  152. {
  153. if (enumerator != null)
  154. {
  155. await enumerator.DisposeAsync().ConfigureAwait(false);
  156. enumerator = null;
  157. set = null;
  158. }
  159. await base.DisposeAsync().ConfigureAwait(false);
  160. }
  161. protected override async Task<bool> MoveNextCore()
  162. {
  163. switch (state)
  164. {
  165. case AsyncIteratorState.Allocated:
  166. enumerator = source.GetAsyncEnumerator();
  167. if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
  168. {
  169. await DisposeAsync().ConfigureAwait(false);
  170. return false;
  171. }
  172. var element = enumerator.Current;
  173. set = new Set<TSource>(comparer);
  174. set.Add(element);
  175. current = element;
  176. state = AsyncIteratorState.Iterating;
  177. return true;
  178. case AsyncIteratorState.Iterating:
  179. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  180. {
  181. element = enumerator.Current;
  182. if (set.Add(element))
  183. {
  184. current = element;
  185. return true;
  186. }
  187. }
  188. break;
  189. }
  190. await DisposeAsync().ConfigureAwait(false);
  191. return false;
  192. }
  193. private async Task<Set<TSource>> FillSetAsync(CancellationToken cancellationToken)
  194. {
  195. var s = new Set<TSource>(comparer);
  196. var enu = source.GetAsyncEnumerator();
  197. try
  198. {
  199. while (await enu.MoveNextAsync(cancellationToken).ConfigureAwait(false))
  200. {
  201. s.Add(enu.Current);
  202. }
  203. }
  204. finally
  205. {
  206. await enu.DisposeAsync().ConfigureAwait(false);
  207. }
  208. return s;
  209. }
  210. }
  211. private sealed class DistinctAsyncIterator<TSource, TKey> : AsyncIterator<TSource>, IIListProvider<TSource>
  212. {
  213. private readonly IEqualityComparer<TKey> comparer;
  214. private readonly Func<TSource, TKey> keySelector;
  215. private readonly IAsyncEnumerable<TSource> source;
  216. private IAsyncEnumerator<TSource> enumerator;
  217. private Set<TKey> set;
  218. public DistinctAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  219. {
  220. Debug.Assert(source != null);
  221. Debug.Assert(keySelector != null);
  222. Debug.Assert(comparer != null);
  223. this.source = source;
  224. this.keySelector = keySelector;
  225. this.comparer = comparer;
  226. }
  227. public async Task<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
  228. {
  229. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  230. return s.ToArray();
  231. }
  232. public async Task<List<TSource>> ToListAsync(CancellationToken cancellationToken)
  233. {
  234. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  235. return s;
  236. }
  237. public async Task<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  238. {
  239. if (onlyIfCheap)
  240. {
  241. return -1;
  242. }
  243. var count = 0;
  244. var s = new Set<TKey>(comparer);
  245. var enu = source.GetAsyncEnumerator();
  246. try
  247. {
  248. while (await enu.MoveNextAsync().ConfigureAwait(false))
  249. {
  250. var item = enu.Current;
  251. if (s.Add(keySelector(item)))
  252. {
  253. count++;
  254. }
  255. }
  256. }
  257. finally
  258. {
  259. await enu.DisposeAsync().ConfigureAwait(false);
  260. }
  261. return count;
  262. }
  263. public override AsyncIterator<TSource> Clone()
  264. {
  265. return new DistinctAsyncIterator<TSource, TKey>(source, keySelector, comparer);
  266. }
  267. public override async Task DisposeAsync()
  268. {
  269. if (enumerator != null)
  270. {
  271. await enumerator.DisposeAsync().ConfigureAwait(false);
  272. enumerator = null;
  273. set = null;
  274. }
  275. await base.DisposeAsync().ConfigureAwait(false);
  276. }
  277. protected override async Task<bool> MoveNextCore()
  278. {
  279. switch (state)
  280. {
  281. case AsyncIteratorState.Allocated:
  282. enumerator = source.GetAsyncEnumerator();
  283. if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
  284. {
  285. await DisposeAsync().ConfigureAwait(false);
  286. return false;
  287. }
  288. var element = enumerator.Current;
  289. set = new Set<TKey>(comparer);
  290. set.Add(keySelector(element));
  291. current = element;
  292. state = AsyncIteratorState.Iterating;
  293. return true;
  294. case AsyncIteratorState.Iterating:
  295. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  296. {
  297. element = enumerator.Current;
  298. if (set.Add(keySelector(element)))
  299. {
  300. current = element;
  301. return true;
  302. }
  303. }
  304. break;
  305. }
  306. await DisposeAsync().ConfigureAwait(false);
  307. return false;
  308. }
  309. private async Task<List<TSource>> FillSetAsync(CancellationToken cancellationToken)
  310. {
  311. var s = new Set<TKey>(comparer);
  312. var r = new List<TSource>();
  313. var enu = source.GetAsyncEnumerator();
  314. try
  315. {
  316. while (await enu.MoveNextAsync(cancellationToken).ConfigureAwait(false))
  317. {
  318. var item = enu.Current;
  319. if (s.Add(keySelector(item)))
  320. {
  321. r.Add(item);
  322. }
  323. }
  324. }
  325. finally
  326. {
  327. await enu.DisposeAsync().ConfigureAwait(false);
  328. }
  329. return r;
  330. }
  331. }
  332. private sealed class DistinctAsyncIteratorWithTask<TSource, TKey> : AsyncIterator<TSource>, IIListProvider<TSource>
  333. {
  334. private readonly IEqualityComparer<TKey> comparer;
  335. private readonly Func<TSource, Task<TKey>> keySelector;
  336. private readonly IAsyncEnumerable<TSource> source;
  337. private IAsyncEnumerator<TSource> enumerator;
  338. private Set<TKey> set;
  339. public DistinctAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  340. {
  341. Debug.Assert(source != null);
  342. Debug.Assert(keySelector != null);
  343. Debug.Assert(comparer != null);
  344. this.source = source;
  345. this.keySelector = keySelector;
  346. this.comparer = comparer;
  347. }
  348. public async Task<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
  349. {
  350. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  351. return s.ToArray();
  352. }
  353. public async Task<List<TSource>> ToListAsync(CancellationToken cancellationToken)
  354. {
  355. var s = await FillSetAsync(cancellationToken).ConfigureAwait(false);
  356. return s;
  357. }
  358. public async Task<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  359. {
  360. if (onlyIfCheap)
  361. {
  362. return -1;
  363. }
  364. var count = 0;
  365. var s = new Set<TKey>(comparer);
  366. var enu = source.GetAsyncEnumerator();
  367. try
  368. {
  369. while (await enu.MoveNextAsync().ConfigureAwait(false))
  370. {
  371. var item = enu.Current;
  372. if (s.Add(await keySelector(item).ConfigureAwait(false)))
  373. {
  374. count++;
  375. }
  376. }
  377. }
  378. finally
  379. {
  380. await enu.DisposeAsync().ConfigureAwait(false);
  381. }
  382. return count;
  383. }
  384. public override AsyncIterator<TSource> Clone()
  385. {
  386. return new DistinctAsyncIteratorWithTask<TSource, TKey>(source, keySelector, comparer);
  387. }
  388. public override async Task DisposeAsync()
  389. {
  390. if (enumerator != null)
  391. {
  392. await enumerator.DisposeAsync().ConfigureAwait(false);
  393. enumerator = null;
  394. set = null;
  395. }
  396. await base.DisposeAsync().ConfigureAwait(false);
  397. }
  398. protected override async Task<bool> MoveNextCore()
  399. {
  400. switch (state)
  401. {
  402. case AsyncIteratorState.Allocated:
  403. enumerator = source.GetAsyncEnumerator();
  404. if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
  405. {
  406. await DisposeAsync().ConfigureAwait(false);
  407. return false;
  408. }
  409. var element = enumerator.Current;
  410. set = new Set<TKey>(comparer);
  411. set.Add(await keySelector(element).ConfigureAwait(false));
  412. current = element;
  413. state = AsyncIteratorState.Iterating;
  414. return true;
  415. case AsyncIteratorState.Iterating:
  416. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  417. {
  418. element = enumerator.Current;
  419. if (set.Add(await keySelector(element).ConfigureAwait(false)))
  420. {
  421. current = element;
  422. return true;
  423. }
  424. }
  425. break;
  426. }
  427. await DisposeAsync().ConfigureAwait(false);
  428. return false;
  429. }
  430. private async Task<List<TSource>> FillSetAsync(CancellationToken cancellationToken)
  431. {
  432. var s = new Set<TKey>(comparer);
  433. var r = new List<TSource>();
  434. var enu = source.GetAsyncEnumerator();
  435. try
  436. {
  437. while (await enu.MoveNextAsync(cancellationToken).ConfigureAwait(false))
  438. {
  439. var item = enu.Current;
  440. if (s.Add(await keySelector(item).ConfigureAwait(false)))
  441. {
  442. r.Add(item);
  443. }
  444. }
  445. }
  446. finally
  447. {
  448. await enu.DisposeAsync().ConfigureAwait(false);
  449. }
  450. return r;
  451. }
  452. }
  453. private sealed class DistinctUntilChangedAsyncIterator<TSource> : AsyncIterator<TSource>
  454. {
  455. private readonly IEqualityComparer<TSource> comparer;
  456. private readonly IAsyncEnumerable<TSource> source;
  457. private TSource currentValue;
  458. private IAsyncEnumerator<TSource> enumerator;
  459. private bool hasCurrentValue;
  460. public DistinctUntilChangedAsyncIterator(IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
  461. {
  462. Debug.Assert(comparer != null);
  463. Debug.Assert(source != null);
  464. this.source = source;
  465. this.comparer = comparer;
  466. }
  467. public override AsyncIterator<TSource> Clone()
  468. {
  469. return new DistinctUntilChangedAsyncIterator<TSource>(source, comparer);
  470. }
  471. public override async Task DisposeAsync()
  472. {
  473. if (enumerator != null)
  474. {
  475. await enumerator.DisposeAsync().ConfigureAwait(false);
  476. enumerator = null;
  477. currentValue = default(TSource);
  478. }
  479. await base.DisposeAsync().ConfigureAwait(false);
  480. }
  481. protected override async Task<bool> MoveNextCore()
  482. {
  483. switch (state)
  484. {
  485. case AsyncIteratorState.Allocated:
  486. enumerator = source.GetAsyncEnumerator();
  487. state = AsyncIteratorState.Iterating;
  488. goto case AsyncIteratorState.Iterating;
  489. case AsyncIteratorState.Iterating:
  490. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  491. {
  492. var item = enumerator.Current;
  493. var comparerEquals = false;
  494. if (hasCurrentValue)
  495. {
  496. comparerEquals = comparer.Equals(currentValue, item);
  497. }
  498. if (!hasCurrentValue || !comparerEquals)
  499. {
  500. hasCurrentValue = true;
  501. currentValue = item;
  502. current = item;
  503. return true;
  504. }
  505. }
  506. break;
  507. }
  508. await DisposeAsync().ConfigureAwait(false);
  509. return false;
  510. }
  511. }
  512. private sealed class DistinctUntilChangedAsyncIterator<TSource, TKey> : AsyncIterator<TSource>
  513. {
  514. private readonly IEqualityComparer<TKey> comparer;
  515. private readonly Func<TSource, TKey> keySelector;
  516. private readonly IAsyncEnumerable<TSource> source;
  517. private TKey currentKeyValue;
  518. private IAsyncEnumerator<TSource> enumerator;
  519. private bool hasCurrentKey;
  520. public DistinctUntilChangedAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  521. {
  522. this.source = source;
  523. this.keySelector = keySelector;
  524. this.comparer = comparer;
  525. }
  526. public override AsyncIterator<TSource> Clone()
  527. {
  528. return new DistinctUntilChangedAsyncIterator<TSource, TKey>(source, keySelector, comparer);
  529. }
  530. public override async Task DisposeAsync()
  531. {
  532. if (enumerator != null)
  533. {
  534. await enumerator.DisposeAsync().ConfigureAwait(false);
  535. enumerator = null;
  536. currentKeyValue = default(TKey);
  537. }
  538. await base.DisposeAsync().ConfigureAwait(false);
  539. }
  540. protected override async Task<bool> MoveNextCore()
  541. {
  542. switch (state)
  543. {
  544. case AsyncIteratorState.Allocated:
  545. enumerator = source.GetAsyncEnumerator();
  546. state = AsyncIteratorState.Iterating;
  547. goto case AsyncIteratorState.Iterating;
  548. case AsyncIteratorState.Iterating:
  549. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  550. {
  551. var item = enumerator.Current;
  552. var key = keySelector(item);
  553. var comparerEquals = false;
  554. if (hasCurrentKey)
  555. {
  556. comparerEquals = comparer.Equals(currentKeyValue, key);
  557. }
  558. if (!hasCurrentKey || !comparerEquals)
  559. {
  560. hasCurrentKey = true;
  561. currentKeyValue = key;
  562. current = item;
  563. return true;
  564. }
  565. }
  566. break; // case
  567. }
  568. await DisposeAsync().ConfigureAwait(false);
  569. return false;
  570. }
  571. }
  572. private sealed class DistinctUntilChangedAsyncIteratorWithTask<TSource, TKey> : AsyncIterator<TSource>
  573. {
  574. private readonly IEqualityComparer<TKey> comparer;
  575. private readonly Func<TSource, Task<TKey>> keySelector;
  576. private readonly IAsyncEnumerable<TSource> source;
  577. private TKey currentKeyValue;
  578. private IAsyncEnumerator<TSource> enumerator;
  579. private bool hasCurrentKey;
  580. public DistinctUntilChangedAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  581. {
  582. this.source = source;
  583. this.keySelector = keySelector;
  584. this.comparer = comparer;
  585. }
  586. public override AsyncIterator<TSource> Clone()
  587. {
  588. return new DistinctUntilChangedAsyncIteratorWithTask<TSource, TKey>(source, keySelector, comparer);
  589. }
  590. public override async Task DisposeAsync()
  591. {
  592. if (enumerator != null)
  593. {
  594. await enumerator.DisposeAsync().ConfigureAwait(false);
  595. enumerator = null;
  596. currentKeyValue = default(TKey);
  597. }
  598. await base.DisposeAsync().ConfigureAwait(false);
  599. }
  600. protected override async Task<bool> MoveNextCore()
  601. {
  602. switch (state)
  603. {
  604. case AsyncIteratorState.Allocated:
  605. enumerator = source.GetAsyncEnumerator();
  606. state = AsyncIteratorState.Iterating;
  607. goto case AsyncIteratorState.Iterating;
  608. case AsyncIteratorState.Iterating:
  609. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  610. {
  611. var item = enumerator.Current;
  612. var key = await keySelector(item).ConfigureAwait(false);
  613. var comparerEquals = false;
  614. if (hasCurrentKey)
  615. {
  616. comparerEquals = comparer.Equals(currentKeyValue, key);
  617. }
  618. if (!hasCurrentKey || !comparerEquals)
  619. {
  620. hasCurrentKey = true;
  621. currentKeyValue = key;
  622. current = item;
  623. return true;
  624. }
  625. }
  626. break; // case
  627. }
  628. await DisposeAsync().ConfigureAwait(false);
  629. return false;
  630. }
  631. }
  632. }
  633. }