// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; namespace System.Reactive.Linq { partial class AsyncObservable { public static IAsyncObservable Distinct(IAsyncObservable source) { if (source == null) throw new ArgumentNullException(nameof(source)); return Create(observer => source.SubscribeAsync(AsyncObserver.Distinct(observer))); } public static IAsyncObservable Distinct(IAsyncObservable source, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); return Create(observer => source.SubscribeAsync(AsyncObserver.Distinct(observer, comparer))); } } partial class AsyncObserver { public static IAsyncObserver Distinct(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); return Distinct(observer, EqualityComparer.Default); } public static IAsyncObserver Distinct(IAsyncObserver observer, IEqualityComparer comparer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (comparer == null) throw new ArgumentNullException(nameof(comparer)); var set = new HashSet(comparer); return Create( async x => { var added = false; try { added = set.Add(x); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } if (added) { await observer.OnNextAsync(x).ConfigureAwait(false); } }, observer.OnErrorAsync, observer.OnCompletedAsync ); } } }