// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information. 
namespace System.Reactive.Linq.ObservableImpl
{
    /// 
    /// Relays items to the downstream until the predicate returns true.
    /// 
    /// The element type of the sequence
    internal sealed class TakeUntilPredicate : 
        Producer.TakeUntilPredicateObserver>
    {
        readonly IObservable _source;
        readonly Func _stopPredicate;
        public TakeUntilPredicate(IObservable source, Func stopPredicate)
        {
            _source = source;
            _stopPredicate = stopPredicate;
        }
        protected override TakeUntilPredicateObserver CreateSink(IObserver observer) => new TakeUntilPredicateObserver(observer, _stopPredicate);
        protected override void Run(TakeUntilPredicateObserver sink) => sink.Run(_source);
        internal sealed class TakeUntilPredicateObserver : IdentitySink
        {
            readonly Func _stopPredicate;
            public TakeUntilPredicateObserver(IObserver downstream, 
                Func predicate) : base (downstream)
            {
                _stopPredicate = predicate;
            }
            public override void OnCompleted()
            {
                ForwardOnCompleted();
            }
            public override void OnError(Exception error)
            {
                ForwardOnError(error);
            }
            public override void OnNext(TSource value)
            {
                ForwardOnNext(value);
                var shouldStop = false;
                try
                {
                    shouldStop = _stopPredicate(value);
                }
                catch (Exception ex)
                {
                    ForwardOnError(ex);
                    return;
                }
                if (shouldStop)
                {
                    ForwardOnCompleted();
                }
            }
        }
    }
}