AsyncIterator.cs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. internal enum AsyncIteratorState
  12. {
  13. New = 0,
  14. Allocated = 1,
  15. Iterating = 2,
  16. Disposed = -1,
  17. }
  18. internal abstract class AsyncIterator<TSource> : IAsyncEnumerable<TSource>, IAsyncEnumerator<TSource>
  19. {
  20. private readonly int threadId;
  21. internal AsyncIteratorState state = AsyncIteratorState.New;
  22. internal TSource current;
  23. private CancellationTokenSource cancellationTokenSource;
  24. private bool currentIsInvalid = true;
  25. protected AsyncIterator()
  26. {
  27. threadId = Environment.CurrentManagedThreadId;
  28. }
  29. public abstract AsyncIterator<TSource> Clone();
  30. public IAsyncEnumerator<TSource> GetEnumerator()
  31. {
  32. var enumerator = state == AsyncIteratorState.New && threadId == Environment.CurrentManagedThreadId ? this : Clone();
  33. enumerator.state = AsyncIteratorState.Allocated;
  34. enumerator.cancellationTokenSource = new CancellationTokenSource();
  35. return enumerator;
  36. }
  37. public virtual void Dispose()
  38. {
  39. if (!cancellationTokenSource.IsCancellationRequested)
  40. {
  41. cancellationTokenSource.Cancel();
  42. }
  43. cancellationTokenSource.Dispose();
  44. current = default(TSource);
  45. state = AsyncIteratorState.Disposed;
  46. }
  47. public TSource Current
  48. {
  49. get
  50. {
  51. if (currentIsInvalid)
  52. throw new InvalidOperationException("Enumerator is in an invalid state");
  53. return current;
  54. }
  55. }
  56. public async Task<bool> MoveNext(CancellationToken cancellationToken)
  57. {
  58. if (state == AsyncIteratorState.Disposed)
  59. {
  60. return false;
  61. }
  62. using (cancellationToken.Register(Dispose))
  63. using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cancellationTokenSource.Token))
  64. {
  65. try
  66. {
  67. // Short circuit and don't even call MoveNexCore
  68. cancellationToken.ThrowIfCancellationRequested();
  69. var result = await MoveNextCore(cts.Token).ConfigureAwait(false);
  70. currentIsInvalid = !result; // if move next is false, invalid otherwise valid
  71. return result;
  72. }
  73. catch
  74. {
  75. currentIsInvalid = true;
  76. Dispose();
  77. throw;
  78. }
  79. }
  80. }
  81. protected abstract Task<bool> MoveNextCore(CancellationToken cancellationToken);
  82. public virtual IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, TResult> selector)
  83. {
  84. return new SelectEnumerableAsyncIterator<TSource, TResult>(this, selector);
  85. }
  86. public virtual IAsyncEnumerable<TSource> Where(Func<TSource, bool> predicate)
  87. {
  88. return new WhereEnumerableAsyncIterator<TSource>(this, predicate);
  89. }
  90. }
  91. }
  92. }