// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace System.Linq
{
public static partial class AsyncEnumerable
{
///
/// Converts an observable sequence to an async-enumerable sequence.
///
/// The type of the elements in the source sequence.
/// Observable sequence to convert to an async-enumerable sequence.
/// The async-enumerable sequence whose elements are pulled from the given observable sequence.
/// is null.
public static IAsyncEnumerable ToAsyncEnumerable(this IObservable source)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
return new ObservableAsyncEnumerable(source);
}
private sealed class ObservableAsyncEnumerable : AsyncIterator, IObserver
{
private readonly IObservable _source;
private ConcurrentQueue? _values = new ConcurrentQueue();
private Exception? _error;
private bool _completed;
private TaskCompletionSource? _signal;
private IDisposable? _subscription;
private CancellationTokenRegistration _ctr;
public ObservableAsyncEnumerable(IObservable source) => _source = source;
public override AsyncIteratorBase Clone() => new ObservableAsyncEnumerable(_source);
public override ValueTask DisposeAsync()
{
Dispose();
return base.DisposeAsync();
}
protected override async ValueTask MoveNextCore()
{
//
// REVIEW: How often should we check? At the very least, we want to prevent
// subscribing if cancellation is requested. A case may be made to
// check for each iteration, namely because this operator is a bridge
// with another interface. However, we also wire up cancellation to
// the observable subscription, so there's redundancy here.
//
_cancellationToken.ThrowIfCancellationRequested();
switch (_state)
{
case AsyncIteratorState.Allocated:
//
// NB: Breaking change to align with lazy nature of async iterators.
//
// In previous implementations, the Subscribe call happened during
// the call to GetAsyncEnumerator.
//
// REVIEW: Confirm this design point. This implementation is compatible
// with an async iterator using "yield return", e.g. subscribing
// to the observable sequence and yielding values out of a local
// queue filled by observer callbacks. However, it departs from
// the dual treatment of Subscribe/GetEnumerator.
//
_subscription = _source.Subscribe(this);
_ctr = _cancellationToken.Register(OnCanceled, state: null);
_state = AsyncIteratorState.Iterating;
goto case AsyncIteratorState.Iterating;
case AsyncIteratorState.Iterating:
while (true)
{
var completed = Volatile.Read(ref _completed);
if (_values!.TryDequeue(out _current!))
{
return true;
}
else if (completed)
{
var error = _error;
if (error != null)
{
throw error;
}
return false;
}
await Resume().ConfigureAwait(false);
Volatile.Write(ref _signal, null);
}
}
await DisposeAsync().ConfigureAwait(false);
return false;
}
public void OnCompleted()
{
Volatile.Write(ref _completed, true);
DisposeSubscription();
OnNotification();
}
public void OnError(Exception error)
{
_error = error;
Volatile.Write(ref _completed, true);
DisposeSubscription();
OnNotification();
}
public void OnNext(TSource value)
{
_values?.Enqueue(value);
OnNotification();
}
private void OnNotification()
{
while (true)
{
var signal = Volatile.Read(ref _signal);
if (signal == TaskExt.True)
{
return;
}
if (signal != null)
{
signal.TrySetResult(true);
return;
}
if (Interlocked.CompareExchange(ref _signal, TaskExt.True, null) == null)
{
return;
}
}
}
private void Dispose()
{
_ctr.Dispose();
DisposeSubscription();
_values = null;
_error = null;
}
private void DisposeSubscription() => Interlocked.Exchange(ref _subscription, null)?.Dispose();
private void OnCanceled(object? state)
{
var cancelledTcs = default(TaskCompletionSource);
Dispose();
while (true)
{
var signal = Volatile.Read(ref _signal);
if (signal != null)
{
if (signal.TrySetCanceled(_cancellationToken))
return;
}
if (cancelledTcs == null)
{
cancelledTcs = new TaskCompletionSource();
cancelledTcs.TrySetCanceled(_cancellationToken);
}
if (Interlocked.CompareExchange(ref _signal, cancelledTcs, signal) == signal)
return;
}
}
private Task Resume()
{
TaskCompletionSource? newSignal = null;
while (true)
{
var signal = Volatile.Read(ref _signal);
if (signal != null)
{
return signal.Task;
}
newSignal ??= new TaskCompletionSource();
if (Interlocked.CompareExchange(ref _signal, newSignal, null) == null)
{
return newSignal.Task;
}
}
}
}
}
}