DistinctUntilChanged.cs 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. class DistinctUntilChanged<TSource, TKey> : Producer<TSource>
  8. {
  9. private readonly IObservable<TSource> _source;
  10. private readonly Func<TSource, TKey> _keySelector;
  11. private readonly IEqualityComparer<TKey> _comparer;
  12. public DistinctUntilChanged(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  13. {
  14. _source = source;
  15. _keySelector = keySelector;
  16. _comparer = comparer;
  17. }
  18. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  19. {
  20. var sink = new _(this, observer, cancel);
  21. setSink(sink);
  22. return _source.SubscribeSafe(sink);
  23. }
  24. class _ : Sink<TSource>, IObserver<TSource>
  25. {
  26. private readonly DistinctUntilChanged<TSource, TKey> _parent;
  27. private TKey _currentKey;
  28. private bool _hasCurrentKey;
  29. public _(DistinctUntilChanged<TSource, TKey> parent, IObserver<TSource> observer, IDisposable cancel)
  30. : base(observer, cancel)
  31. {
  32. _parent = parent;
  33. _currentKey = default(TKey);
  34. _hasCurrentKey = false;
  35. }
  36. public void OnNext(TSource value)
  37. {
  38. var key = default(TKey);
  39. try
  40. {
  41. key = _parent._keySelector(value);
  42. }
  43. catch (Exception exception)
  44. {
  45. base._observer.OnError(exception);
  46. base.Dispose();
  47. return;
  48. }
  49. var comparerEquals = false;
  50. if (_hasCurrentKey)
  51. {
  52. try
  53. {
  54. comparerEquals = _parent._comparer.Equals(_currentKey, key);
  55. }
  56. catch (Exception exception)
  57. {
  58. base._observer.OnError(exception);
  59. base.Dispose();
  60. return;
  61. }
  62. }
  63. if (!_hasCurrentKey || !comparerEquals)
  64. {
  65. _hasCurrentKey = true;
  66. _currentKey = key;
  67. base._observer.OnNext(value);
  68. }
  69. }
  70. public void OnError(Exception error)
  71. {
  72. base._observer.OnError(error);
  73. base.Dispose();
  74. }
  75. public void OnCompleted()
  76. {
  77. base._observer.OnCompleted();
  78. base.Dispose();
  79. }
  80. }
  81. }
  82. }
  83. #endif