// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Threading; namespace System.Reactive { /// /// Abstract base class for implementations of the IObserver<T> interface. /// /// This base class enforces the grammar of observers where OnError and OnCompleted are terminal messages. /// The type of the elements in the sequence. public abstract class ObserverBase : IObserver, IDisposable { private int isStopped; /// /// Creates a new observer in a non-stopped state. /// protected ObserverBase() { isStopped = 0; } /// /// Notifies the observer of a new element in the sequence. /// /// Next element in the sequence. public void OnNext(T value) { if (isStopped == 0) OnNextCore(value); } /// /// Implement this method to react to the receival of a new element in the sequence. /// /// Next element in the sequence. /// This method only gets called when the observer hasn't stopped yet. protected abstract void OnNextCore(T value); /// /// Notifies the observer that an exception has occurred. /// /// The error that has occurred. /// is null. public void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); if (Interlocked.Exchange(ref isStopped, 1) == 0) { OnErrorCore(error); } } /// /// Implement this method to react to the occurrence of an exception. /// /// The error that has occurred. /// This method only gets called when the observer hasn't stopped yet, and causes the observer to stop. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1716:IdentifiersShouldNotMatchKeywords", MessageId = "Error", Justification = "Same name as in the IObserver definition of OnError in the BCL.")] protected abstract void OnErrorCore(Exception error); /// /// Notifies the observer of the end of the sequence. /// public void OnCompleted() { if (Interlocked.Exchange(ref isStopped, 1) == 0) { OnCompletedCore(); } } /// /// Implement this method to react to the end of the sequence. /// /// This method only gets called when the observer hasn't stopped yet, and causes the observer to stop. protected abstract void OnCompletedCore(); /// /// Disposes the observer, causing it to transition to the stopped state. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Core implementation of IDisposable. /// /// true if the Dispose call was triggered by the IDisposable.Dispose method; false if it was triggered by the finalizer. protected virtual void Dispose(bool disposing) { if (disposing) { isStopped = 1; } } internal bool Fail(Exception error) { if (Interlocked.Exchange(ref isStopped, 1) == 0) { OnErrorCore(error); return true; } return false; } } }