// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; namespace System.Reactive { internal interface ISink { void ForwardOnNext(TTarget value); void ForwardOnCompleted(); void ForwardOnError(Exception error); } internal abstract class Sink : ISink, IDisposable { private IDisposable _upstream; private volatile IObserver _observer; protected Sink(IObserver observer) { _observer = observer; } public void Dispose() { Dispose(true); } protected virtual void Dispose(bool disposing) { _observer = NopObserver.Instance; Disposable.TryDispose(ref _upstream); } public void ForwardOnNext(TTarget value) { _observer.OnNext(value); } public void ForwardOnCompleted() { _observer.OnCompleted(); Dispose(); } public void ForwardOnError(Exception error) { _observer.OnError(error); Dispose(); } protected void SetUpstream(IDisposable upstream) { Disposable.SetSingle(ref _upstream, upstream); } protected void DisposeUpstream() { Disposable.TryDispose(ref _upstream); } } /// /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer. /// /// Type of the resulting sequence's elements. /// /// Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer. internal abstract class Sink : Sink, IObserver { protected Sink(IObserver observer) : base(observer) { } public virtual void Run(IObservable source) { SetUpstream(source.SubscribeSafe(this)); } public abstract void OnNext(TSource value); public virtual void OnError(Exception error) { ForwardOnError(error); } public virtual void OnCompleted() { ForwardOnCompleted(); } public IObserver GetForwarder() => new _(this); private sealed class _ : IObserver { private readonly Sink _forward; public _(Sink forward) { _forward = forward; } public void OnNext(TTarget value) { _forward.ForwardOnNext(value); } public void OnError(Exception error) { _forward.ForwardOnError(error); } public void OnCompleted() { _forward.ForwardOnCompleted(); } } } }