// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; namespace System.Reactive.Linq.ObservableImpl { class MinBy : Producer> { private readonly IObservable _source; private readonly Func _keySelector; private readonly IComparer _comparer; public MinBy(IObservable source, Func keySelector, IComparer comparer) { _source = source; _keySelector = keySelector; _comparer = comparer; } protected override IDisposable Run(IObserver> observer, IDisposable cancel, Action setSink) { var sink = new _(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink>, IObserver { private readonly MinBy _parent; private bool _hasValue; private TKey _lastKey; private List _list; public _(MinBy parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; _hasValue = false; _lastKey = default(TKey); _list = new List(); } public void OnNext(TSource value) { var key = default(TKey); try { key = _parent._keySelector(value); } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); return; } var comparison = 0; if (!_hasValue) { _hasValue = true; _lastKey = key; } else { try { comparison = _parent._comparer.Compare(key, _lastKey); } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); return; } } if (comparison < 0) { _lastKey = key; _list.Clear(); } if (comparison <= 0) { _list.Add(value); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_list); base._observer.OnCompleted(); base.Dispose(); } } } }