// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Reactive.Disposables; namespace System.Reactive.Subjects { /// /// Represents a value that changes over time. /// Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications. /// /// The type of the elements processed by the subject. public sealed class BehaviorSubject : ISubject, IDisposable { /// /// Gets the current value or throws an exception. /// /// The initial value passed to the constructor until is called; after which, the last value passed to . /// /// is frozen after is called. /// After is called, always throws the specified exception. /// An exception is always thrown after is called. /// /// Reading is a thread-safe operation, though there's a potential race condition when or are being invoked concurrently. /// In some cases, it may be necessary for a caller to use external synchronization to avoid race conditions. /// /// /// Dispose was called. public T Value { get { lock (_gate) { CheckDisposed(); if (_exception != null) { throw _exception; } return _value; } } } private readonly object _gate = new object(); private ImmutableList> _observers; private bool _isStopped; private T _value; private Exception _exception; private bool _isDisposed; /// /// Initializes a new instance of the class which creates a subject that caches its last value and starts with the specified value. /// /// Initial value sent to observers when no other value has been received by the subject yet. public BehaviorSubject(T value) { _value = value; _observers = new ImmutableList>(); } /// /// Indicates whether the subject has observers subscribed to it. /// public bool HasObservers { get { var observers = _observers; return observers != null && observers.Data.Length > 0; } } /// /// Notifies all subscribed observers about the end of the sequence. /// public void OnCompleted() { var os = default(IObserver[]); lock (_gate) { CheckDisposed(); if (!_isStopped) { os = _observers.Data; _observers = new ImmutableList>(); _isStopped = true; } } if (os != null) { foreach (var o in os) o.OnCompleted(); } } /// /// Notifies all subscribed observers about the exception. /// /// The exception to send to all observers. /// is null. public void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); var os = default(IObserver[]); lock (_gate) { CheckDisposed(); if (!_isStopped) { os = _observers.Data; _observers = new ImmutableList>(); _isStopped = true; _exception = error; } } if (os != null) foreach (var o in os) o.OnError(error); } /// /// Notifies all subscribed observers about the arrival of the specified element in the sequence. /// /// The value to send to all observers. public void OnNext(T value) { var os = default(IObserver[]); lock (_gate) { CheckDisposed(); if (!_isStopped) { _value = value; os = _observers.Data; } } if (os != null) { foreach (var o in os) o.OnNext(value); } } /// /// Subscribes an observer to the subject. /// /// Observer to subscribe to the subject. /// Disposable object that can be used to unsubscribe the observer from the subject. /// is null. public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); var ex = default(Exception); lock (_gate) { CheckDisposed(); if (!_isStopped) { _observers = _observers.Add(observer); observer.OnNext(_value); return new Subscription(this, observer); } ex = _exception; } if (ex != null) observer.OnError(ex); else observer.OnCompleted(); return Disposable.Empty; } class Subscription : IDisposable { private readonly BehaviorSubject _subject; private IObserver _observer; public Subscription(BehaviorSubject subject, IObserver observer) { _subject = subject; _observer = observer; } public void Dispose() { if (_observer != null) { lock (_subject._gate) { if (!_subject._isDisposed && _observer != null) { _subject._observers = _subject._observers.Remove(_observer); _observer = null; } } } } } void CheckDisposed() { if (_isDisposed) throw new ObjectDisposedException(string.Empty); } /// /// Unsubscribe all observers and release resources. /// public void Dispose() { lock (_gate) { _isDisposed = true; _observers = null; _value = default(T); _exception = null; } } } }