// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. namespace System.Reactive.Linq.ObservableImpl { /// /// Relays items to the downstream until the predicate returns true. /// /// The element type of the sequence internal sealed class TakeUntilPredicate : Producer.TakeUntilPredicateObserver> { readonly IObservable _source; readonly Func _stopPredicate; public TakeUntilPredicate(IObservable source, Func stopPredicate) { _source = source; _stopPredicate = stopPredicate; } protected override TakeUntilPredicateObserver CreateSink(IObserver observer) => new TakeUntilPredicateObserver(observer, _stopPredicate); protected override void Run(TakeUntilPredicateObserver sink) => sink.Run(_source); internal sealed class TakeUntilPredicateObserver : IdentitySink { readonly Func _stopPredicate; public TakeUntilPredicateObserver(IObserver downstream, Func predicate) : base (downstream) { _stopPredicate = predicate; } public override void OnCompleted() { ForwardOnCompleted(); } public override void OnError(Exception error) { ForwardOnError(error); } public override void OnNext(TSource value) { ForwardOnNext(value); var shouldStop = false; try { shouldStop = _stopPredicate(value); } catch (Exception ex) { ForwardOnError(ex); return; } if (shouldStop) { ForwardOnCompleted(); } } } } }