// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Subjects { /// /// Represents the result of an asynchronous operation. /// The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers. /// /// The type of the elements processed by the subject. public sealed class AsyncSubject : SubjectBase, INotifyCompletion { #region Fields private AsyncSubjectDisposable[] _observers; private T _value; private bool _hasValue; private Exception _exception; /// /// A pre-allocated empty array for the no-observers state. /// private static readonly AsyncSubjectDisposable[] EMPTY = new AsyncSubjectDisposable[0]; /// /// A pre-allocated empty array indicating the AsyncSubject has terminated /// private static readonly AsyncSubjectDisposable[] TERMINATED = new AsyncSubjectDisposable[0]; /// /// A pre-allocated empty array indicating the AsyncSubject has terminated /// private static readonly AsyncSubjectDisposable[] DISPOSED = new AsyncSubjectDisposable[0]; #endregion #region Constructors /// /// Creates a subject that can only receive one value and that value is cached for all future observations. /// public AsyncSubject() { _observers = EMPTY; } #endregion #region Properties /// /// Indicates whether the subject has observers subscribed to it. /// public override bool HasObservers => _observers.Length != 0; /// /// Indicates whether the subject has been disposed. /// public override bool IsDisposed => Volatile.Read(ref _observers) == DISPOSED; #endregion #region Methods #region IObserver implementation /// /// Notifies all subscribed observers about the end of the sequence, also causing the last received value to be sent out (if any). /// public override void OnCompleted() { for (; ; ) { var observers = Volatile.Read(ref _observers); if (observers == DISPOSED) { _exception = null; ThrowDisposed(); break; } if (observers == TERMINATED) { break; } if (Interlocked.CompareExchange(ref _observers, TERMINATED, observers) == observers) { var hasValue = _hasValue; if (hasValue) { var value = _value; foreach (var o in observers) { if (!o.IsDisposed()) { o.Downstream.OnNext(value); o.Downstream.OnCompleted(); } } } else { foreach (var o in observers) { if (!o.IsDisposed()) { o.Downstream.OnCompleted(); } } } } } } /// /// Notifies all subscribed observers about the exception. /// /// The exception to send to all observers. /// is null. public override void OnError(Exception error) { if (error == null) { throw new ArgumentNullException(nameof(error)); } for (; ; ) { var observers = Volatile.Read(ref _observers); if (observers == DISPOSED) { _exception = null; _value = default; ThrowDisposed(); break; } if (observers == TERMINATED) { break; } _exception = error; if (Interlocked.CompareExchange(ref _observers, TERMINATED, observers) == observers) { foreach (var o in observers) { if (!o.IsDisposed()) { o.Downstream.OnError(error); } } } } } /// /// Sends a value to the subject. The last value received before successful termination will be sent to all subscribed and future observers. /// /// The value to store in the subject. public override void OnNext(T value) { var observers = Volatile.Read(ref _observers); if (observers == DISPOSED) { _value = default; _exception = null; ThrowDisposed(); return; } if (observers == TERMINATED) { return; } _value = value; _hasValue = true; } #endregion #region IObservable implementation /// /// Subscribes an observer to the subject. /// /// Observer to subscribe to the subject. /// Disposable object that can be used to unsubscribe the observer from the subject. /// is null. public override IDisposable Subscribe(IObserver observer) { if (observer == null) { throw new ArgumentNullException(nameof(observer)); } var parent = new AsyncSubjectDisposable(this, observer); if (!Add(parent)) { var ex = _exception; if (ex != null) { observer.OnError(ex); } else { if (_hasValue) { observer.OnNext(_value); } observer.OnCompleted(); } return Disposable.Empty; } return parent; } private bool Add(AsyncSubjectDisposable inner) { for (; ; ) { var a = Volatile.Read(ref _observers); if (a == DISPOSED) { _value = default; _exception = null; ThrowDisposed(); return true; } if (a == TERMINATED) { return false; } var n = a.Length; var b = new AsyncSubjectDisposable[n + 1]; Array.Copy(a, 0, b, 0, n); b[n] = inner; if (Interlocked.CompareExchange(ref _observers, b, a) == a) { return true; } } } private void Remove(AsyncSubjectDisposable inner) { for (; ; ) { var a = Volatile.Read(ref _observers); var n = a.Length; if (n == 0) { break; } var j = -1; for (var i = 0; i < n; i++) { if (a[i] == inner) { j = i; break; } } if (j < 0) { break; } var b = default(AsyncSubjectDisposable[]); if (n == 1) { b = EMPTY; } else { b = new AsyncSubjectDisposable[n - 1]; Array.Copy(a, 0, b, 0, j); Array.Copy(a, j + 1, b, j, n - j - 1); } if (Interlocked.CompareExchange(ref _observers, b, a) == a) { break; } } } /// /// A disposable connecting the AsyncSubject and an IObserver. /// private sealed class AsyncSubjectDisposable : IDisposable { internal readonly IObserver Downstream; private AsyncSubject _parent; public AsyncSubjectDisposable(AsyncSubject parent, IObserver downstream) { _parent = parent; Downstream = downstream; } public void Dispose() { Interlocked.Exchange(ref _parent, null)?.Remove(this); } internal bool IsDisposed() { return Volatile.Read(ref _parent) == null; } } #endregion #region IDisposable implementation private void ThrowDisposed() { throw new ObjectDisposedException(string.Empty); } /// /// Unsubscribe all observers and release resources. /// public override void Dispose() { if (Interlocked.Exchange(ref _observers, DISPOSED) != DISPOSED) { _exception = null; _value = default; _hasValue = false; } } #endregion #region Await support /// /// Gets an awaitable object for the current AsyncSubject. /// /// Object that can be awaited. public AsyncSubject GetAwaiter() => this; /// /// Specifies a callback action that will be invoked when the subject completes. /// /// Callback action that will be invoked when the subject completes. /// is null. public void OnCompleted(Action continuation) { if (continuation == null) { throw new ArgumentNullException(nameof(continuation)); } OnCompleted(continuation, originalContext: true); } private void OnCompleted(Action continuation, bool originalContext) { // // [OK] Use of unsafe Subscribe: this type's Subscribe implementation is safe. // Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext)); } private sealed class AwaitObserver : IObserver { private readonly SynchronizationContext _context; private readonly Action _callback; public AwaitObserver(Action callback, bool originalContext) { if (originalContext) { _context = SynchronizationContext.Current; } _callback = callback; } public void OnCompleted() => InvokeOnOriginalContext(); public void OnError(Exception error) => InvokeOnOriginalContext(); public void OnNext(T value) { } private void InvokeOnOriginalContext() { if (_context != null) { // // No need for OperationStarted and OperationCompleted calls here; // this code is invoked through await support and will have a way // to observe its start/complete behavior, either through returned // Task objects or the async method builder's interaction with the // SynchronizationContext object. // _context.Post(c => ((Action)c)(), _callback); } else { _callback(); } } } /// /// Gets whether the AsyncSubject has completed. /// public bool IsCompleted => Volatile.Read(ref _observers) == TERMINATED; /// /// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally. /// /// The last element of the subject. Throws an InvalidOperationException if no element was received. /// The source sequence is empty. [Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Await pattern for C# and VB compilers.")] public T GetResult() { if (Volatile.Read(ref _observers) != TERMINATED) { var e = new ManualResetEvent(initialState: false); OnCompleted(() => e.Set(), originalContext: false); e.WaitOne(); } _exception.ThrowIfNotNull(); if (!_hasValue) { throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); } return _value; } #endregion #endregion } }