|
|
@@ -99,7 +99,7 @@ namespace System.Reactive.Linq
|
|
|
if (keySelector == null)
|
|
|
throw new ArgumentNullException(nameof(keySelector));
|
|
|
|
|
|
- return DistinctUntilChanged(observer, x => Task.FromResult(keySelector(x)), EqualityComparer<TKey>.Default);
|
|
|
+ return DistinctUntilChanged(observer, keySelector, EqualityComparer<TKey>.Default);
|
|
|
}
|
|
|
|
|
|
public static IAsyncObserver<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObserver<TSource> observer, Func<TSource, Task<TKey>> keySelector)
|
|
|
@@ -121,7 +121,50 @@ namespace System.Reactive.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return DistinctUntilChanged(observer, x => Task.FromResult(keySelector(x)), comparer);
|
|
|
+ var hasCurrentKey = false;
|
|
|
+ var currentKey = default(TKey);
|
|
|
+
|
|
|
+ return Create<TSource>(
|
|
|
+ async x =>
|
|
|
+ {
|
|
|
+ var key = default(TKey);
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ key = keySelector(x);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ await observer.OnErrorAsync(ex).ConfigureAwait(false);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ var equals = default(bool);
|
|
|
+
|
|
|
+ if (hasCurrentKey)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ equals = comparer.Equals(currentKey, key);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ await observer.OnErrorAsync(ex).ConfigureAwait(false);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!hasCurrentKey || !equals)
|
|
|
+ {
|
|
|
+ hasCurrentKey = true;
|
|
|
+ currentKey = key;
|
|
|
+
|
|
|
+ await observer.OnNextAsync(x).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ observer.OnErrorAsync,
|
|
|
+ observer.OnCompletedAsync
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
public static IAsyncObserver<TSource> DistinctUntilChanged<TSource, TKey>(IAsyncObserver<TSource> observer, Func<TSource, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
|