|
|
@@ -4,6 +4,7 @@
|
|
|
|
|
|
using System;
|
|
|
using System.Collections.Generic;
|
|
|
+using System.Diagnostics;
|
|
|
using System.Linq;
|
|
|
using System.Threading;
|
|
|
using System.Threading.Tasks;
|
|
|
@@ -21,11 +22,7 @@ namespace System.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Defer(() =>
|
|
|
- {
|
|
|
- var set = new Set<TKey>(comparer);
|
|
|
- return source.Where(item => set.Add(keySelector(item)));
|
|
|
- });
|
|
|
+ return new DistinctAsyncIterator<TSource, TKey>(source, keySelector, comparer);
|
|
|
}
|
|
|
|
|
|
public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
|
|
|
@@ -45,11 +42,7 @@ namespace System.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Defer(() =>
|
|
|
- {
|
|
|
- var set = new HashSet<TSource>(comparer);
|
|
|
- return source.Where(set.Add);
|
|
|
- });
|
|
|
+ return new DistinctAsyncIterator<TSource>(source, comparer);
|
|
|
}
|
|
|
|
|
|
public static IAsyncEnumerable<TSource> Distinct<TSource>(this IAsyncEnumerable<TSource> source)
|
|
|
@@ -65,7 +58,7 @@ namespace System.Linq
|
|
|
if (source == null)
|
|
|
throw new ArgumentNullException(nameof(source));
|
|
|
|
|
|
- return source.DistinctUntilChanged_(x => x, EqualityComparer<TSource>.Default);
|
|
|
+ return source.DistinctUntilChanged(EqualityComparer<TSource>.Default);
|
|
|
}
|
|
|
|
|
|
public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource>(this IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
|
|
|
@@ -75,7 +68,7 @@ namespace System.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return source.DistinctUntilChanged_(x => x, comparer);
|
|
|
+ return new DistinctUntilChangedAsyncIterator<TSource>(source, comparer);
|
|
|
}
|
|
|
|
|
|
public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
|
|
|
@@ -102,56 +95,375 @@ namespace System.Linq
|
|
|
|
|
|
private static IAsyncEnumerable<TSource> DistinctUntilChanged_<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
|
|
|
{
|
|
|
- return CreateEnumerable(
|
|
|
- () =>
|
|
|
+ return new DistinctUntilChangedAsyncIterator<TSource, TKey>(source, keySelector, comparer);
|
|
|
+ }
|
|
|
+
|
|
|
+ private sealed class DistinctAsyncIterator<TSource, TKey> : AsyncIterator<TSource>, IIListProvider<TSource>
|
|
|
+ {
|
|
|
+ private readonly IEqualityComparer<TKey> comparer;
|
|
|
+ private readonly Func<TSource, TKey> keySelector;
|
|
|
+ private readonly IAsyncEnumerable<TSource> source;
|
|
|
+
|
|
|
+ private IAsyncEnumerator<TSource> enumerator;
|
|
|
+ private Set<TKey> set;
|
|
|
+
|
|
|
+ public DistinctAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
|
|
|
+ {
|
|
|
+ Debug.Assert(source != null);
|
|
|
+
|
|
|
+ this.source = source;
|
|
|
+ this.keySelector = keySelector;
|
|
|
+ this.comparer = comparer;
|
|
|
+ }
|
|
|
+
|
|
|
+ public async Task<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var s = await FillSet(cancellationToken)
|
|
|
+ .ConfigureAwait(false);
|
|
|
+ return s.ToArray();
|
|
|
+ }
|
|
|
+
|
|
|
+ public async Task<List<TSource>> ToListAsync(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var s = await FillSet(cancellationToken)
|
|
|
+ .ConfigureAwait(false);
|
|
|
+ return s;
|
|
|
+ }
|
|
|
+
|
|
|
+ public async Task<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ return onlyIfCheap ? -1 : (await FillSet(cancellationToken)
|
|
|
+ .ConfigureAwait(false)).Count;
|
|
|
+ }
|
|
|
+
|
|
|
+ public override AsyncIterator<TSource> Clone()
|
|
|
+ {
|
|
|
+ return new DistinctAsyncIterator<TSource, TKey>(source, keySelector, comparer);
|
|
|
+ }
|
|
|
+
|
|
|
+ public override void Dispose()
|
|
|
+ {
|
|
|
+ if (enumerator != null)
|
|
|
{
|
|
|
- var e = source.GetEnumerator();
|
|
|
+ enumerator.Dispose();
|
|
|
+ enumerator = null;
|
|
|
+ set = null;
|
|
|
+ }
|
|
|
|
|
|
- var cts = new CancellationTokenDisposable();
|
|
|
- var d = Disposable.Create(cts, e);
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
|
|
|
- var currentKey = default(TKey);
|
|
|
- var hasCurrentKey = false;
|
|
|
- var current = default(TSource);
|
|
|
+ protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ switch (state)
|
|
|
+ {
|
|
|
+ case AsyncIteratorState.Allocated:
|
|
|
+ enumerator = source.GetEnumerator();
|
|
|
+ if (!await enumerator.MoveNext(cancellationToken)
|
|
|
+ .ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ Dispose();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ var element = enumerator.Current;
|
|
|
+ set = new Set<TKey>(comparer);
|
|
|
+ set.Add(keySelector(element));
|
|
|
+ current = element;
|
|
|
+ state = AsyncIteratorState.Iterating;
|
|
|
+ return true;
|
|
|
|
|
|
- var f = default(Func<CancellationToken, Task<bool>>);
|
|
|
- f = async ct =>
|
|
|
+ case AsyncIteratorState.Iterating:
|
|
|
+ while (await enumerator.MoveNext(cancellationToken)
|
|
|
+ .ConfigureAwait(false))
|
|
|
{
|
|
|
- if (await e.MoveNext(ct)
|
|
|
- .ConfigureAwait(false))
|
|
|
+ element = enumerator.Current;
|
|
|
+ if (set.Add(keySelector(element)))
|
|
|
{
|
|
|
- var item = e.Current;
|
|
|
- var key = default(TKey);
|
|
|
- var comparerEquals = false;
|
|
|
-
|
|
|
- key = keySelector(item);
|
|
|
-
|
|
|
- if (hasCurrentKey)
|
|
|
- {
|
|
|
- comparerEquals = comparer.Equals(currentKey, key);
|
|
|
- }
|
|
|
-
|
|
|
- if (!hasCurrentKey || !comparerEquals)
|
|
|
- {
|
|
|
- hasCurrentKey = true;
|
|
|
- currentKey = key;
|
|
|
-
|
|
|
- current = item;
|
|
|
- return true;
|
|
|
- }
|
|
|
- return await f(ct)
|
|
|
- .ConfigureAwait(false);
|
|
|
+ current = element;
|
|
|
+ return true;
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ Dispose();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<List<TSource>> FillSet(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var s = new Set<TKey>(comparer);
|
|
|
+ var r = new List<TSource>();
|
|
|
+ using (var enu = source.GetEnumerator())
|
|
|
+ {
|
|
|
+ while (await enu.MoveNext(cancellationToken)
|
|
|
+ .ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ var item = enu.Current;
|
|
|
+ if (s.Add(keySelector(item)))
|
|
|
+ {
|
|
|
+ r.Add(item);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return r;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private sealed class DistinctAsyncIterator<TSource> : AsyncIterator<TSource>, IIListProvider<TSource>
|
|
|
+ {
|
|
|
+ private readonly IEqualityComparer<TSource> comparer;
|
|
|
+ private readonly IAsyncEnumerable<TSource> source;
|
|
|
+
|
|
|
+ private IAsyncEnumerator<TSource> enumerator;
|
|
|
+ private Set<TSource> set;
|
|
|
+
|
|
|
+ public DistinctAsyncIterator(IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
|
|
|
+ {
|
|
|
+ Debug.Assert(source != null);
|
|
|
+
|
|
|
+ this.source = source;
|
|
|
+ this.comparer = comparer;
|
|
|
+ }
|
|
|
+
|
|
|
+ public async Task<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var s = await FillSet(cancellationToken)
|
|
|
+ .ConfigureAwait(false);
|
|
|
+ return s.ToArray();
|
|
|
+ }
|
|
|
+
|
|
|
+ public async Task<List<TSource>> ToListAsync(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var s = await FillSet(cancellationToken)
|
|
|
+ .ConfigureAwait(false);
|
|
|
+ return s.ToList();
|
|
|
+ }
|
|
|
+
|
|
|
+ public async Task<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ return onlyIfCheap ? -1 : (await FillSet(cancellationToken)
|
|
|
+ .ConfigureAwait(false)).Count;
|
|
|
+ }
|
|
|
+
|
|
|
+ public override AsyncIterator<TSource> Clone()
|
|
|
+ {
|
|
|
+ return new DistinctAsyncIterator<TSource>(source, comparer);
|
|
|
+ }
|
|
|
+
|
|
|
+ public override void Dispose()
|
|
|
+ {
|
|
|
+ if (enumerator != null)
|
|
|
+ {
|
|
|
+ enumerator.Dispose();
|
|
|
+ enumerator = null;
|
|
|
+ set = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ switch (state)
|
|
|
+ {
|
|
|
+ case AsyncIteratorState.Allocated:
|
|
|
+ enumerator = source.GetEnumerator();
|
|
|
+ if (!await enumerator.MoveNext(cancellationToken)
|
|
|
+ .ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ Dispose();
|
|
|
return false;
|
|
|
- };
|
|
|
-
|
|
|
- return CreateEnumerator(
|
|
|
- f,
|
|
|
- () => current,
|
|
|
- d.Dispose,
|
|
|
- e
|
|
|
- );
|
|
|
- });
|
|
|
+ }
|
|
|
+
|
|
|
+ var element = enumerator.Current;
|
|
|
+ set = new Set<TSource>(comparer);
|
|
|
+ set.Add(element);
|
|
|
+ current = element;
|
|
|
+ state = AsyncIteratorState.Iterating;
|
|
|
+ return true;
|
|
|
+
|
|
|
+ case AsyncIteratorState.Iterating:
|
|
|
+ while (await enumerator.MoveNext(cancellationToken)
|
|
|
+ .ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ element = enumerator.Current;
|
|
|
+ if (set.Add(element))
|
|
|
+ {
|
|
|
+ current = element;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ Dispose();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<Set<TSource>> FillSet(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var s = new Set<TSource>(comparer);
|
|
|
+ using (var enu = source.GetEnumerator())
|
|
|
+ {
|
|
|
+ while (await enu.MoveNext(cancellationToken)
|
|
|
+ .ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ s.Add(enu.Current);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return s;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private sealed class DistinctUntilChangedAsyncIterator<TSource> : AsyncIterator<TSource>
|
|
|
+ {
|
|
|
+ private readonly IEqualityComparer<TSource> comparer;
|
|
|
+ private readonly IAsyncEnumerable<TSource> source;
|
|
|
+ private TSource currentValue;
|
|
|
+
|
|
|
+ private IAsyncEnumerator<TSource> enumerator;
|
|
|
+ private bool hasCurrentValue;
|
|
|
+
|
|
|
+ public DistinctUntilChangedAsyncIterator(IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
|
|
|
+ {
|
|
|
+ this.source = source;
|
|
|
+ this.comparer = comparer;
|
|
|
+ }
|
|
|
+
|
|
|
+ public override AsyncIterator<TSource> Clone()
|
|
|
+ {
|
|
|
+ return new DistinctUntilChangedAsyncIterator<TSource>(source, comparer);
|
|
|
+ }
|
|
|
+
|
|
|
+ public override void Dispose()
|
|
|
+ {
|
|
|
+ if (enumerator != null)
|
|
|
+ {
|
|
|
+ enumerator.Dispose();
|
|
|
+ enumerator = null;
|
|
|
+ currentValue = default(TSource);
|
|
|
+ }
|
|
|
+
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ switch (state)
|
|
|
+ {
|
|
|
+ case AsyncIteratorState.Allocated:
|
|
|
+ enumerator = source.GetEnumerator();
|
|
|
+ state = AsyncIteratorState.Iterating;
|
|
|
+ goto case AsyncIteratorState.Iterating;
|
|
|
+
|
|
|
+ case AsyncIteratorState.Iterating:
|
|
|
+ if (await enumerator.MoveNext(cancellationToken)
|
|
|
+ .ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ var item = enumerator.Current;
|
|
|
+ var comparerEquals = false;
|
|
|
+
|
|
|
+ if (hasCurrentValue)
|
|
|
+ {
|
|
|
+ comparerEquals = comparer.Equals(currentValue, item);
|
|
|
+ }
|
|
|
+ if (!hasCurrentValue || !comparerEquals)
|
|
|
+ {
|
|
|
+ hasCurrentValue = true;
|
|
|
+ currentValue = item;
|
|
|
+ current = item;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ goto case AsyncIteratorState.Iterating; // loop
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ Dispose();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private sealed class DistinctUntilChangedAsyncIterator<TSource, TKey> : AsyncIterator<TSource>
|
|
|
+ {
|
|
|
+ private readonly IEqualityComparer<TKey> comparer;
|
|
|
+ private readonly Func<TSource, TKey> keySelector;
|
|
|
+ private readonly IAsyncEnumerable<TSource> source;
|
|
|
+ private TKey currentKeyValue;
|
|
|
+
|
|
|
+ private IAsyncEnumerator<TSource> enumerator;
|
|
|
+ private bool hasCurrentKey;
|
|
|
+
|
|
|
+ public DistinctUntilChangedAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
|
|
|
+ {
|
|
|
+ this.source = source;
|
|
|
+ this.keySelector = keySelector;
|
|
|
+ this.comparer = comparer;
|
|
|
+ }
|
|
|
+
|
|
|
+ public override AsyncIterator<TSource> Clone()
|
|
|
+ {
|
|
|
+ return new DistinctUntilChangedAsyncIterator<TSource, TKey>(source, keySelector, comparer);
|
|
|
+ }
|
|
|
+
|
|
|
+ public override void Dispose()
|
|
|
+ {
|
|
|
+ if (enumerator != null)
|
|
|
+ {
|
|
|
+ enumerator.Dispose();
|
|
|
+ enumerator = null;
|
|
|
+ currentKeyValue = default(TKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ switch (state)
|
|
|
+ {
|
|
|
+ case AsyncIteratorState.Allocated:
|
|
|
+ enumerator = source.GetEnumerator();
|
|
|
+ state = AsyncIteratorState.Iterating;
|
|
|
+ goto case AsyncIteratorState.Iterating;
|
|
|
+
|
|
|
+ case AsyncIteratorState.Iterating:
|
|
|
+ if (await enumerator.MoveNext(cancellationToken)
|
|
|
+ .ConfigureAwait(false))
|
|
|
+ {
|
|
|
+ var item = enumerator.Current;
|
|
|
+ var key = keySelector(item);
|
|
|
+ var comparerEquals = false;
|
|
|
+
|
|
|
+ if (hasCurrentKey)
|
|
|
+ {
|
|
|
+ comparerEquals = comparer.Equals(currentKeyValue, key);
|
|
|
+ }
|
|
|
+ if (!hasCurrentKey || !comparerEquals)
|
|
|
+ {
|
|
|
+ hasCurrentKey = true;
|
|
|
+ currentKeyValue = key;
|
|
|
+ current = item;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ goto case AsyncIteratorState.Iterating; // loop
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ Dispose();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|