AsyncEnumerable.Creation.cs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Linq
  10. {
  11. public static partial class AsyncEnumerable
  12. {
  13. public static IAsyncEnumerable<T> Create<T>(Func<IAsyncEnumerator<T>> getEnumerator)
  14. {
  15. return new AnonymousAsyncEnumerable<T>(getEnumerator);
  16. }
  17. private class AnonymousAsyncEnumerable<T> : IAsyncEnumerable<T>
  18. {
  19. private Func<IAsyncEnumerator<T>> getEnumerator;
  20. public AnonymousAsyncEnumerable(Func<IAsyncEnumerator<T>> getEnumerator)
  21. {
  22. this.getEnumerator = getEnumerator;
  23. }
  24. public IAsyncEnumerator<T> GetEnumerator()
  25. {
  26. return getEnumerator();
  27. }
  28. }
  29. private static IAsyncEnumerator<T> Create<T>(Func<CancellationToken, Task<bool>> moveNext, Func<T> current,
  30. Action dispose, IDisposable enumerator)
  31. {
  32. return Create(async ct =>
  33. {
  34. using (ct.Register(dispose))
  35. {
  36. try
  37. {
  38. var result = await moveNext(ct).ConfigureAwait(false);
  39. if (!result)
  40. {
  41. enumerator?.Dispose();
  42. }
  43. return result;
  44. }
  45. catch
  46. {
  47. enumerator?.Dispose();
  48. throw;
  49. }
  50. }
  51. }, current, dispose);
  52. }
  53. public static IAsyncEnumerator<T> Create<T>(Func<CancellationToken, Task<bool>> moveNext, Func<T> current, Action dispose)
  54. {
  55. return new AnonymousAsyncEnumerator<T>(moveNext, current, dispose);
  56. }
  57. private static IAsyncEnumerator<T> Create<T>(Func<CancellationToken, TaskCompletionSource<bool>, Task<bool>> moveNext, Func<T> current, Action dispose)
  58. {
  59. var self = default(IAsyncEnumerator<T>);
  60. self = new AnonymousAsyncEnumerator<T>(
  61. async ct =>
  62. {
  63. var tcs = new TaskCompletionSource<bool>();
  64. var stop = new Action(() =>
  65. {
  66. self.Dispose();
  67. tcs.TrySetCanceled();
  68. });
  69. using (ct.Register(stop))
  70. {
  71. return await moveNext(ct, tcs).ConfigureAwait(false);
  72. }
  73. },
  74. current,
  75. dispose
  76. );
  77. return self;
  78. }
  79. private class AnonymousAsyncEnumerator<T> : IAsyncEnumerator<T>
  80. {
  81. private readonly Func<CancellationToken, Task<bool>> _moveNext;
  82. private readonly Func<T> _current;
  83. private readonly Action _dispose;
  84. private bool _disposed;
  85. public AnonymousAsyncEnumerator(Func<CancellationToken, Task<bool>> moveNext, Func<T> current, Action dispose)
  86. {
  87. _moveNext = moveNext;
  88. _current = current;
  89. _dispose = dispose;
  90. }
  91. public Task<bool> MoveNext(CancellationToken cancellationToken)
  92. {
  93. if (_disposed)
  94. return TaskExt.False;
  95. return _moveNext(cancellationToken);
  96. }
  97. public T Current
  98. {
  99. get
  100. {
  101. return _current();
  102. }
  103. }
  104. public void Dispose()
  105. {
  106. if (!_disposed)
  107. {
  108. _disposed = true;
  109. _dispose();
  110. }
  111. }
  112. }
  113. public static IAsyncEnumerable<TValue> Return<TValue>(TValue value)
  114. {
  115. return new[] { value }.ToAsyncEnumerable();
  116. }
  117. public static IAsyncEnumerable<TValue> Throw<TValue>(Exception exception)
  118. {
  119. if (exception == null)
  120. throw new ArgumentNullException(nameof(exception));
  121. return Create(() => Create<TValue>(
  122. ct => TaskExt.Throw<bool>(exception),
  123. () => { throw new InvalidOperationException(); },
  124. () => { })
  125. );
  126. }
  127. public static IAsyncEnumerable<TValue> Never<TValue>()
  128. {
  129. return Create(() => Create<TValue>(
  130. (ct, tcs) => tcs.Task,
  131. () => { throw new InvalidOperationException(); },
  132. () => { })
  133. );
  134. }
  135. public static IAsyncEnumerable<TValue> Empty<TValue>()
  136. {
  137. return Create(() => Create<TValue>(
  138. ct => TaskExt.False,
  139. () => { throw new InvalidOperationException(); },
  140. () => { })
  141. );
  142. }
  143. public static IAsyncEnumerable<int> Range(int start, int count)
  144. {
  145. if (count < 0)
  146. throw new ArgumentOutOfRangeException(nameof(count));
  147. return Enumerable.Range(start, count).ToAsyncEnumerable();
  148. }
  149. public static IAsyncEnumerable<TResult> Repeat<TResult>(TResult element, int count)
  150. {
  151. if (count < 0)
  152. throw new ArgumentOutOfRangeException(nameof(count));
  153. return Enumerable.Repeat(element, count).ToAsyncEnumerable();
  154. }
  155. public static IAsyncEnumerable<TResult> Repeat<TResult>(TResult element)
  156. {
  157. return Create(() =>
  158. {
  159. return Create(
  160. ct => TaskExt.True,
  161. () => element,
  162. () => { }
  163. );
  164. });
  165. }
  166. public static IAsyncEnumerable<TSource> Defer<TSource>(Func<IAsyncEnumerable<TSource>> factory)
  167. {
  168. if (factory == null)
  169. throw new ArgumentNullException(nameof(factory));
  170. return Create(() => factory().GetEnumerator());
  171. }
  172. public static IAsyncEnumerable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
  173. {
  174. if (condition == null)
  175. throw new ArgumentNullException(nameof(condition));
  176. if (iterate == null)
  177. throw new ArgumentNullException(nameof(iterate));
  178. if (resultSelector == null)
  179. throw new ArgumentNullException(nameof(resultSelector));
  180. return Create(() =>
  181. {
  182. var i = initialState;
  183. var started = false;
  184. var current = default(TResult);
  185. return Create(
  186. ct =>
  187. {
  188. var b = false;
  189. try
  190. {
  191. if (started)
  192. i = iterate(i);
  193. b = condition(i);
  194. if (b)
  195. current = resultSelector(i);
  196. }
  197. catch (Exception ex)
  198. {
  199. return TaskExt.Throw<bool>(ex);
  200. }
  201. if (!b)
  202. return TaskExt.False;
  203. if (!started)
  204. started = true;
  205. return TaskExt.True;
  206. },
  207. () => current,
  208. () => { }
  209. );
  210. });
  211. }
  212. public static IAsyncEnumerable<TSource> Using<TSource, TResource>(Func<TResource> resourceFactory, Func<TResource, IAsyncEnumerable<TSource>> enumerableFactory) where TResource : IDisposable
  213. {
  214. if (resourceFactory == null)
  215. throw new ArgumentNullException(nameof(resourceFactory));
  216. if (enumerableFactory == null)
  217. throw new ArgumentNullException(nameof(enumerableFactory));
  218. return Create(() =>
  219. {
  220. var resource = resourceFactory();
  221. var e = default(IAsyncEnumerator<TSource>);
  222. try
  223. {
  224. e = enumerableFactory(resource).GetEnumerator();
  225. }
  226. catch (Exception)
  227. {
  228. resource.Dispose();
  229. throw;
  230. }
  231. var cts = new CancellationTokenDisposable();
  232. var d = Disposable.Create(cts, resource, e);
  233. var current = default(TSource);
  234. return Create(
  235. async ct =>
  236. {
  237. bool res;
  238. try
  239. {
  240. res = await e.MoveNext(cts.Token).ConfigureAwait(false);
  241. }
  242. catch (Exception)
  243. {
  244. d.Dispose();
  245. throw;
  246. }
  247. if (res)
  248. {
  249. current = e.Current;
  250. return true;
  251. }
  252. d.Dispose();
  253. return false;
  254. },
  255. () => current,
  256. d.Dispose,
  257. null
  258. );
  259. });
  260. }
  261. }
  262. }