ToObservable.cs 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading.Tasks;
  6. namespace System.Linq
  7. {
  8. public static partial class AsyncEnumerable
  9. {
  10. public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this IObservable<TSource> source)
  11. {
  12. if (source == null)
  13. {
  14. throw new ArgumentNullException(nameof(source));
  15. }
  16. return CreateEnumerable(
  17. () =>
  18. {
  19. var observer = new ToAsyncEnumerableObserver<TSource>();
  20. var subscription = source.Subscribe(observer);
  21. return CreateEnumerator(
  22. (ct, tcs) =>
  23. {
  24. var hasValue = false;
  25. var hasCompleted = false;
  26. var error = default(Exception);
  27. lock (observer.SyncRoot)
  28. {
  29. if (observer.Values.Count > 0)
  30. {
  31. hasValue = true;
  32. observer.Current = observer.Values.Dequeue();
  33. }
  34. else if (observer.HasCompleted)
  35. {
  36. hasCompleted = true;
  37. }
  38. else if (observer.Error != null)
  39. {
  40. error = observer.Error;
  41. }
  42. else
  43. {
  44. observer.TaskCompletionSource = tcs;
  45. }
  46. }
  47. if (hasValue)
  48. {
  49. tcs.TrySetResult(true);
  50. }
  51. else if (hasCompleted)
  52. {
  53. tcs.TrySetResult(false);
  54. }
  55. else if (error != null)
  56. {
  57. tcs.TrySetException(error);
  58. }
  59. return tcs.Task;
  60. },
  61. () => observer.Current,
  62. () =>
  63. {
  64. subscription.Dispose();
  65. // Should we cancel in-flight operations somehow?
  66. });
  67. });
  68. }
  69. public static IObservable<TSource> ToObservable<TSource>(this IAsyncEnumerable<TSource> source)
  70. {
  71. if (source == null)
  72. {
  73. throw new ArgumentNullException(nameof(source));
  74. }
  75. return new ToObservableObservable<TSource>(source);
  76. }
  77. private class ToAsyncEnumerableObserver<T> : IObserver<T>
  78. {
  79. public readonly Queue<T> Values;
  80. public T Current;
  81. public Exception Error;
  82. public bool HasCompleted;
  83. public TaskCompletionSource<bool> TaskCompletionSource;
  84. public ToAsyncEnumerableObserver()
  85. {
  86. Values = new Queue<T>();
  87. }
  88. public object SyncRoot
  89. {
  90. get { return Values; }
  91. }
  92. public void OnCompleted()
  93. {
  94. var tcs = default(TaskCompletionSource<bool>);
  95. lock (SyncRoot)
  96. {
  97. HasCompleted = true;
  98. if (TaskCompletionSource != null)
  99. {
  100. tcs = TaskCompletionSource;
  101. TaskCompletionSource = null;
  102. }
  103. }
  104. if (tcs != null)
  105. {
  106. tcs.TrySetResult(false);
  107. }
  108. }
  109. public void OnError(Exception error)
  110. {
  111. var tcs = default(TaskCompletionSource<bool>);
  112. lock (SyncRoot)
  113. {
  114. Error = error;
  115. if (TaskCompletionSource != null)
  116. {
  117. tcs = TaskCompletionSource;
  118. TaskCompletionSource = null;
  119. }
  120. }
  121. if (tcs != null)
  122. {
  123. tcs.TrySetException(error);
  124. }
  125. }
  126. public void OnNext(T value)
  127. {
  128. var tcs = default(TaskCompletionSource<bool>);
  129. lock (SyncRoot)
  130. {
  131. if (TaskCompletionSource == null)
  132. {
  133. Values.Enqueue(value);
  134. }
  135. else
  136. {
  137. Current = value;
  138. tcs = TaskCompletionSource;
  139. TaskCompletionSource = null;
  140. }
  141. }
  142. if (tcs != null)
  143. {
  144. tcs.TrySetResult(true);
  145. }
  146. }
  147. }
  148. private class ToObservableObservable<T> : IObservable<T>
  149. {
  150. private readonly IAsyncEnumerable<T> _source;
  151. public ToObservableObservable(IAsyncEnumerable<T> source)
  152. {
  153. _source = source;
  154. }
  155. public IDisposable Subscribe(IObserver<T> observer)
  156. {
  157. var ctd = new CancellationTokenDisposable();
  158. var e = _source.GetEnumerator();
  159. void f() => e.MoveNext(ctd.Token)
  160. .ContinueWith(t =>
  161. {
  162. if (t.IsFaulted)
  163. {
  164. observer.OnError(t.Exception);
  165. e.Dispose();
  166. }
  167. else if (t.IsCanceled)
  168. {
  169. e.Dispose();
  170. }
  171. else if (t.IsCompleted)
  172. {
  173. if (t.Result)
  174. {
  175. observer.OnNext(e.Current);
  176. if (!ctd.Token.IsCancellationRequested)
  177. {
  178. f();
  179. }
  180. //In case cancellation is requested, this could only have happened
  181. //by disposing the returned composite disposable (see below).
  182. //In that case, e will be disposed too, so there is no need to dispose e here.
  183. }
  184. else
  185. {
  186. observer.OnCompleted();
  187. e.Dispose();
  188. }
  189. }
  190. }, ctd.Token);
  191. f();
  192. return Disposable.Create(ctd, e);
  193. }
  194. }
  195. }
  196. }