// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace System.Linq { public static partial class AsyncEnumerable { /// /// Converts an observable sequence to an async-enumerable sequence. /// /// The type of the elements in the source sequence. /// Observable sequence to convert to an async-enumerable sequence. /// The async-enumerable sequence whose elements are pulled from the given observable sequence. /// is null. public static IAsyncEnumerable ToAsyncEnumerable(this IObservable source) { if (source == null) throw Error.ArgumentNull(nameof(source)); return new ObservableAsyncEnumerable(source); } private sealed class ObservableAsyncEnumerable : AsyncIterator, IObserver { private readonly IObservable _source; private ConcurrentQueue? _values = new ConcurrentQueue(); private Exception? _error; private bool _completed; private TaskCompletionSource? _signal; private IDisposable? _subscription; private CancellationTokenRegistration _ctr; public ObservableAsyncEnumerable(IObservable source) => _source = source; public override AsyncIteratorBase Clone() => new ObservableAsyncEnumerable(_source); public override ValueTask DisposeAsync() { Dispose(); return base.DisposeAsync(); } protected override async ValueTask MoveNextCore() { // // REVIEW: How often should we check? At the very least, we want to prevent // subscribing if cancellation is requested. A case may be made to // check for each iteration, namely because this operator is a bridge // with another interface. However, we also wire up cancellation to // the observable subscription, so there's redundancy here. // _cancellationToken.ThrowIfCancellationRequested(); switch (_state) { case AsyncIteratorState.Allocated: // // NB: Breaking change to align with lazy nature of async iterators. // // In previous implementations, the Subscribe call happened during // the call to GetAsyncEnumerator. // // REVIEW: Confirm this design point. This implementation is compatible // with an async iterator using "yield return", e.g. subscribing // to the observable sequence and yielding values out of a local // queue filled by observer callbacks. However, it departs from // the dual treatment of Subscribe/GetEnumerator. // _subscription = _source.Subscribe(this); _ctr = _cancellationToken.Register(OnCanceled, state: null); _state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; case AsyncIteratorState.Iterating: while (true) { var completed = Volatile.Read(ref _completed); if (_values!.TryDequeue(out _current!)) { return true; } else if (completed) { var error = _error; if (error != null) { throw error; } return false; } await Resume().ConfigureAwait(false); Volatile.Write(ref _signal, null); } } await DisposeAsync().ConfigureAwait(false); return false; } public void OnCompleted() { Volatile.Write(ref _completed, true); DisposeSubscription(); OnNotification(); } public void OnError(Exception error) { _error = error; Volatile.Write(ref _completed, true); DisposeSubscription(); OnNotification(); } public void OnNext(TSource value) { _values?.Enqueue(value); OnNotification(); } private void OnNotification() { while (true) { var signal = Volatile.Read(ref _signal); if (signal == TaskExt.True) { return; } if (signal != null) { signal.TrySetResult(true); return; } if (Interlocked.CompareExchange(ref _signal, TaskExt.True, null) == null) { return; } } } private void Dispose() { _ctr.Dispose(); DisposeSubscription(); _values = null; _error = null; } private void DisposeSubscription() => Interlocked.Exchange(ref _subscription, null)?.Dispose(); private void OnCanceled(object? state) { var cancelledTcs = default(TaskCompletionSource); Dispose(); while (true) { var signal = Volatile.Read(ref _signal); if (signal != null) { if (signal.TrySetCanceled(_cancellationToken)) return; } if (cancelledTcs == null) { cancelledTcs = new TaskCompletionSource(); cancelledTcs.TrySetCanceled(_cancellationToken); } if (Interlocked.CompareExchange(ref _signal, cancelledTcs, signal) == signal) return; } } private Task Resume() { TaskCompletionSource? newSignal = null; while (true) { var signal = Volatile.Read(ref _signal); if (signal != null) { return signal.Task; } newSignal ??= new TaskCompletionSource(); if (Interlocked.CompareExchange(ref _signal, newSignal, null) == null) { return newSignal.Task; } } } } } }