// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
using System.Threading;
namespace System.Reactive
{
///
/// Abstract base class for implementations of the IObserver<T> interface.
///
/// This base class enforces the grammar of observers where OnError and OnCompleted are terminal messages.
/// The type of the elements in the sequence.
public abstract class ObserverBase : IObserver, IDisposable
{
private int isStopped;
///
/// Creates a new observer in a non-stopped state.
///
protected ObserverBase()
{
isStopped = 0;
}
///
/// Notifies the observer of a new element in the sequence.
///
/// Next element in the sequence.
public void OnNext(T value)
{
if (Volatile.Read(ref isStopped) == 0)
OnNextCore(value);
}
///
/// Implement this method to react to the receival of a new element in the sequence.
///
/// Next element in the sequence.
/// This method only gets called when the observer hasn't stopped yet.
protected abstract void OnNextCore(T value);
///
/// Notifies the observer that an exception has occurred.
///
/// The error that has occurred.
/// is null.
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException(nameof(error));
if (Interlocked.Exchange(ref isStopped, 1) == 0)
{
OnErrorCore(error);
}
}
///
/// Implement this method to react to the occurrence of an exception.
///
/// The error that has occurred.
/// This method only gets called when the observer hasn't stopped yet, and causes the observer to stop.
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1716:IdentifiersShouldNotMatchKeywords", MessageId = "Error", Justification = "Same name as in the IObserver definition of OnError in the BCL.")]
protected abstract void OnErrorCore(Exception error);
///
/// Notifies the observer of the end of the sequence.
///
public void OnCompleted()
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
{
OnCompletedCore();
}
}
///
/// Implement this method to react to the end of the sequence.
///
/// This method only gets called when the observer hasn't stopped yet, and causes the observer to stop.
protected abstract void OnCompletedCore();
///
/// Disposes the observer, causing it to transition to the stopped state.
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
///
/// Core implementation of IDisposable.
///
/// true if the Dispose call was triggered by the IDisposable.Dispose method; false if it was triggered by the finalizer.
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
Volatile.Write(ref isStopped, 1);
}
}
internal bool Fail(Exception error)
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
{
OnErrorCore(error);
return true;
}
return false;
}
}
}