| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System.Reactive.Disposables;using System.Threading;using System.Runtime.CompilerServices;using System.Reactive.Concurrency;namespace System.Reactive.Subjects{    /// <summary>    /// Represents the result of an asynchronous operation.    /// The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.    /// </summary>    /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>    public sealed class AsyncSubject<T> : SubjectBase<T>, IDisposable#if HAS_AWAIT        , INotifyCompletion#endif    {        #region Fields        private readonly object _gate = new object();        private ImmutableList<IObserver<T>> _observers;        private bool _isDisposed;        private bool _isStopped;        private T _value;        private bool _hasValue;        private Exception _exception;        #endregion        #region Constructors        /// <summary>        /// Creates a subject that can only receive one value and that value is cached for all future observations.        /// </summary>        public AsyncSubject()        {            _observers = ImmutableList<IObserver<T>>.Empty;        }        #endregion        #region Properties        /// <summary>        /// Indicates whether the subject has observers subscribed to it.        /// </summary>        public override bool HasObservers        {            get            {                var observers = _observers;                return observers != null && observers.Data.Length > 0;            }        }        /// <summary>        /// Indicates whether the subject has been disposed.        /// </summary>        public override bool IsDisposed        {            get            {                lock (_gate)                {                    return _isDisposed;                }            }        }        #endregion        #region Methods        #region IObserver<T> implementation        /// <summary>        /// Notifies all subscribed observers about the end of the sequence, also causing the last received value to be sent out (if any).        /// </summary>        public override void OnCompleted()        {            var os = default(IObserver<T>[]);            var v = default(T);            var hv = false;            lock (_gate)            {                CheckDisposed();                if (!_isStopped)                {                    os = _observers.Data;                    _observers = ImmutableList<IObserver<T>>.Empty;                    _isStopped = true;                    v = _value;                    hv = _hasValue;                }            }            if (os != null)            {                if (hv)                {                    foreach (var o in os)                    {                        o.OnNext(v);                        o.OnCompleted();                    }                }                else                    foreach (var o in os)                        o.OnCompleted();            }        }        /// <summary>        /// Notifies all subscribed observers about the exception.        /// </summary>        /// <param name="error">The exception to send to all observers.</param>        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>        public override void OnError(Exception error)        {            if (error == null)                throw new ArgumentNullException("error");            var os = default(IObserver<T>[]);            lock (_gate)            {                CheckDisposed();                if (!_isStopped)                {                    os = _observers.Data;                    _observers = ImmutableList<IObserver<T>>.Empty;                    _isStopped = true;                    _exception = error;                }            }            if (os != null)                foreach (var o in os)                    o.OnError(error);        }        /// <summary>        /// Sends a value to the subject. The last value received before successful termination will be sent to all subscribed and future observers.        /// </summary>        /// <param name="value">The value to store in the subject.</param>        public override void OnNext(T value)        {            lock (_gate)            {                CheckDisposed();                if (!_isStopped)                {                    _value = value;                    _hasValue = true;                }            }        }        #endregion        #region IObservable<T> implementation        /// <summary>        /// Subscribes an observer to the subject.        /// </summary>        /// <param name="observer">Observer to subscribe to the subject.</param>        /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>        public override IDisposable Subscribe(IObserver<T> observer)        {            if (observer == null)                throw new ArgumentNullException("observer");            var ex = default(Exception);            var v = default(T);            var hv = false;            lock (_gate)            {                CheckDisposed();                if (!_isStopped)                {                    _observers = _observers.Add(observer);                    return new Subscription(this, observer);                }                ex = _exception;                hv = _hasValue;                v = _value;            }            if (ex != null)            {                observer.OnError(ex);            }            else if (hv)            {                observer.OnNext(v);                observer.OnCompleted();            }            else            {                observer.OnCompleted();            }            return Disposable.Empty;        }        class Subscription : IDisposable        {            private readonly AsyncSubject<T> _subject;            private IObserver<T> _observer;            public Subscription(AsyncSubject<T> subject, IObserver<T> observer)            {                _subject = subject;                _observer = observer;            }            public void Dispose()            {                if (_observer != null)                {                    lock (_subject._gate)                    {                        if (!_subject._isDisposed && _observer != null)                        {                            _subject._observers = _subject._observers.Remove(_observer);                            _observer = null;                        }                    }                }            }        }        #endregion        #region IDisposable implementation        void CheckDisposed()        {            if (_isDisposed)                throw new ObjectDisposedException(string.Empty);        }        /// <summary>        /// Unsubscribe all observers and release resources.        /// </summary>        public override void Dispose()        {            lock (_gate)            {                _isDisposed = true;                _observers = null;                _exception = null;                _value = default(T);            }        }        #endregion        #region Await support#if HAS_AWAIT        /// <summary>        /// Gets an awaitable object for the current AsyncSubject.        /// </summary>        /// <returns>Object that can be awaited.</returns>        public AsyncSubject<T> GetAwaiter()        {            return this;        }        /// <summary>        /// Specifies a callback action that will be invoked when the subject completes.        /// </summary>        /// <param name="continuation">Callback action that will be invoked when the subject completes.</param>        /// <exception cref="ArgumentNullException"><paramref name="continuation"/> is null.</exception>        public void OnCompleted(Action continuation)        {            if (continuation == null)                throw new ArgumentNullException("continuation");            OnCompleted(continuation, true);        }#endif        private void OnCompleted(Action continuation, bool originalContext)        {            //            // [OK] Use of unsafe Subscribe: this type's Subscribe implementation is safe.            //            this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext));        }        class AwaitObserver : IObserver<T>        {#if HAS_AWAIT            private readonly SynchronizationContext _context;#endif            private readonly Action _callback;            public AwaitObserver(Action callback, bool originalContext)            {#if HAS_AWAIT                if (originalContext)                    _context = SynchronizationContext.Current;#else                System.Diagnostics.Debug.Assert(!originalContext);#endif                _callback = callback;            }            public void OnCompleted()            {                InvokeOnOriginalContext();            }            public void OnError(Exception error)            {                InvokeOnOriginalContext();            }            public void OnNext(T value)            {            }            private void InvokeOnOriginalContext()            {#if HAS_AWAIT                if (_context != null)                {                    //                    // No need for OperationStarted and OperationCompleted calls here;                    // this code is invoked through await support and will have a way                    // to observe its start/complete behavior, either through returned                    // Task objects or the async method builder's interaction with the                    // SynchronizationContext object.                    //                    _context.Post(c => ((Action)c)(), _callback);                }                else#endif                {                    _callback();                }            }        }        /// <summary>        /// Gets whether the AsyncSubject has completed.        /// </summary>        public bool IsCompleted        {            get            {                return _isStopped;            }        }        /// <summary>        /// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally.        /// </summary>        /// <returns>The last element of the subject. Throws an InvalidOperationException if no element was received.</returns>        /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Await pattern for C# and VB compilers.")]        public T GetResult()        {            if (!_isStopped)            {                var e = new ManualResetEvent(false);                OnCompleted(() => e.Set(), false);                e.WaitOne();            }            _exception.ThrowIfNotNull();            if (!_hasValue)                throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);            return _value;        }        #endregion        #endregion    }}
 |