Distinct.cs 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Collections.Generic;
  7. namespace System.Reactive.Linq.ObservableImpl
  8. {
  9. class Distinct<TSource, TKey> : Producer<TSource>
  10. {
  11. private readonly IObservable<TSource> _source;
  12. private readonly Func<TSource, TKey> _keySelector;
  13. private readonly IEqualityComparer<TKey> _comparer;
  14. public Distinct(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  15. {
  16. _source = source;
  17. _keySelector = keySelector;
  18. _comparer = comparer;
  19. }
  20. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  21. {
  22. var sink = new _(this, observer, cancel);
  23. setSink(sink);
  24. return _source.SubscribeSafe(sink);
  25. }
  26. class _ : Sink<TSource>, IObserver<TSource>
  27. {
  28. private readonly Distinct<TSource, TKey> _parent;
  29. private HashSet<TKey> _hashSet;
  30. public _(Distinct<TSource, TKey> parent, IObserver<TSource> observer, IDisposable cancel)
  31. : base(observer, cancel)
  32. {
  33. _parent = parent;
  34. _hashSet = new HashSet<TKey>(_parent._comparer);
  35. }
  36. public void OnNext(TSource value)
  37. {
  38. var key = default(TKey);
  39. var hasAdded = false;
  40. try
  41. {
  42. key = _parent._keySelector(value);
  43. hasAdded = _hashSet.Add(key);
  44. }
  45. catch (Exception exception)
  46. {
  47. base._observer.OnError(exception);
  48. base.Dispose();
  49. return;
  50. }
  51. if (hasAdded)
  52. base._observer.OnNext(value);
  53. }
  54. public void OnError(Exception error)
  55. {
  56. base._observer.OnError(error);
  57. base.Dispose();
  58. }
  59. public void OnCompleted()
  60. {
  61. base._observer.OnCompleted();
  62. base.Dispose();
  63. }
  64. }
  65. }
  66. }
  67. #endif