// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. #if !NO_PERF using System.Threading; namespace System.Reactive { /// /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer. /// /// Type of the resulting sequence's elements. /// Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer. internal abstract class Sink : IDisposable { protected internal volatile IObserver _observer; private IDisposable _cancel; public Sink(IObserver observer, IDisposable cancel) { _observer = observer; _cancel = cancel; } public virtual void Dispose() { _observer = NopObserver.Instance; Interlocked.Exchange(ref _cancel, null)?.Dispose(); } public IObserver GetForwarder() => new _(this); private sealed class _ : IObserver { private readonly Sink _forward; public _(Sink forward) { _forward = forward; } public void OnNext(TSource value) { _forward._observer.OnNext(value); } public void OnError(Exception error) { _forward._observer.OnError(error); _forward.Dispose(); } public void OnCompleted() { _forward._observer.OnCompleted(); _forward.Dispose(); } } } } #endif