ToAsyncEnumerable.Observable.cs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this IObservable<TSource> source)
  13. {
  14. if (source == null)
  15. throw Error.ArgumentNull(nameof(source));
  16. return new ObservableAsyncEnumerable<TSource>(source);
  17. }
  18. private sealed class ObservableAsyncEnumerable<TSource> : AsyncIterator<TSource>, IObserver<TSource>
  19. {
  20. private readonly IObservable<TSource> _source;
  21. private ConcurrentQueue<TSource> _values = new ConcurrentQueue<TSource>();
  22. private Exception _error;
  23. private bool _completed;
  24. private TaskCompletionSource<bool> _signal;
  25. private IDisposable _subscription;
  26. private CancellationTokenRegistration _ctr;
  27. public ObservableAsyncEnumerable(IObservable<TSource> source) => _source = source;
  28. public override AsyncIteratorBase<TSource> Clone() => new ObservableAsyncEnumerable<TSource>(_source);
  29. public override ValueTask DisposeAsync()
  30. {
  31. Dispose();
  32. return base.DisposeAsync();
  33. }
  34. protected override async ValueTask<bool> MoveNextCore()
  35. {
  36. //
  37. // REVIEW: How often should we check? At the very least, we want to prevent
  38. // subscribing if cancellation is requested. A case may be made to
  39. // check for each iteration, namely because this operator is a bridge
  40. // with another interface. However, we also wire up cancellation to
  41. // the observable subscription, so there's redundancy here.
  42. //
  43. _cancellationToken.ThrowIfCancellationRequested();
  44. switch (_state)
  45. {
  46. case AsyncIteratorState.Allocated:
  47. //
  48. // NB: Breaking change to align with lazy nature of async iterators.
  49. //
  50. // In previous implementations, the Subscribe call happened during
  51. // the call to GetAsyncEnumerator.
  52. //
  53. // REVIEW: Confirm this design point. This implementation is compatible
  54. // with an async iterator using "yield return", e.g. subscribing
  55. // to the observable sequence and yielding values out of a local
  56. // queue filled by observer callbacks. However, it departs from
  57. // the dual treatment of Subscribe/GetEnumerator.
  58. //
  59. _subscription = _source.Subscribe(this);
  60. _ctr = _cancellationToken.Register(OnCanceled, state: null);
  61. _state = AsyncIteratorState.Iterating;
  62. goto case AsyncIteratorState.Iterating;
  63. case AsyncIteratorState.Iterating:
  64. while (true)
  65. {
  66. var completed = Volatile.Read(ref _completed);
  67. if (_values.TryDequeue(out _current))
  68. {
  69. return true;
  70. }
  71. else if (completed)
  72. {
  73. var error = _error;
  74. if (error != null)
  75. {
  76. throw error;
  77. }
  78. return false;
  79. }
  80. await Resume().ConfigureAwait(false);
  81. Volatile.Write(ref _signal, null);
  82. }
  83. }
  84. await DisposeAsync().ConfigureAwait(false);
  85. return false;
  86. }
  87. public void OnCompleted()
  88. {
  89. Volatile.Write(ref _completed, true);
  90. DisposeSubscription();
  91. OnNotification();
  92. }
  93. public void OnError(Exception error)
  94. {
  95. _error = error;
  96. Volatile.Write(ref _completed, true);
  97. DisposeSubscription();
  98. OnNotification();
  99. }
  100. public void OnNext(TSource value)
  101. {
  102. _values?.Enqueue(value);
  103. OnNotification();
  104. }
  105. private void OnNotification()
  106. {
  107. while (true)
  108. {
  109. var signal = Volatile.Read(ref _signal);
  110. if (signal == TaskExt.True)
  111. {
  112. return;
  113. }
  114. if (signal != null)
  115. {
  116. signal.TrySetResult(true);
  117. return;
  118. }
  119. if (Interlocked.CompareExchange(ref _signal, TaskExt.True, null) == null)
  120. {
  121. return;
  122. }
  123. }
  124. }
  125. private void Dispose()
  126. {
  127. _ctr.Dispose();
  128. DisposeSubscription();
  129. _values = null;
  130. _error = null;
  131. }
  132. private void DisposeSubscription() => Interlocked.Exchange(ref _subscription, null)?.Dispose();
  133. private void OnCanceled(object state) => Dispose();
  134. private Task Resume()
  135. {
  136. TaskCompletionSource<bool> newSignal = null;
  137. while (true)
  138. {
  139. var signal = Volatile.Read(ref _signal);
  140. if (signal != null)
  141. {
  142. return signal.Task;
  143. }
  144. if (newSignal == null)
  145. {
  146. newSignal = new TaskCompletionSource<bool>();
  147. }
  148. if (Interlocked.CompareExchange(ref _signal, newSignal, null) == null)
  149. {
  150. return newSignal.Task;
  151. }
  152. }
  153. }
  154. }
  155. }
  156. }