#if !NO_PERF
using System.Threading;
namespace System.Reactive
{
    /// 
    /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer.
    /// 
    /// Type of the resulting sequence's elements.
    /// Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer.
    abstract class Sink : IDisposable
    {
        protected volatile IObserver _observer;
        private IDisposable _cancel;
        public Sink(IObserver observer, IDisposable cancel)
        {
            _observer = observer;
            _cancel = cancel;
        }
        public virtual void Dispose()
        {
            _observer = NopObserver.Instance;
            var cancel = Interlocked.Exchange(ref _cancel, null);
            if (cancel != null)
            {
                cancel.Dispose();
            }
        }
        public IObserver GetForwarder()
        {
            return new _(this);
        }
        class _ : IObserver
        {
            private readonly Sink _forward;
            public _(Sink forward)
            {
                _forward = forward;
            }
            public void OnNext(TSource value)
            {
                _forward._observer.OnNext(value);
            }
            public void OnError(Exception error)
            {
                _forward._observer.OnError(error);
                _forward.Dispose();
            }
            public void OnCompleted()
            {
                _forward._observer.OnCompleted();
                _forward.Dispose();
            }
        }
    }
}
#endif