123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- namespace System.Linq
- {
- public static partial class AsyncEnumerable
- {
- internal abstract class AsyncIterator<TSource> : IAsyncEnumerable<TSource>, IAsyncEnumerator<TSource>
- {
- public enum State
- {
- New = 0,
- Allocated = 1,
- Iterating = 2,
- Disposed = -1,
- }
- private readonly int threadId;
- internal State state = State.New;
- internal TSource current;
- private CancellationTokenSource cancellationTokenSource;
- protected AsyncIterator()
- {
- threadId = Environment.CurrentManagedThreadId;
- }
- public abstract AsyncIterator<TSource> Clone();
- public IAsyncEnumerator<TSource> GetEnumerator()
- {
- var enumerator = state == State.New && threadId == Environment.CurrentManagedThreadId ? this : Clone();
- enumerator.state = State.Allocated;
- enumerator.cancellationTokenSource = new CancellationTokenSource();
- return enumerator;
- }
-
- public virtual void Dispose()
- {
- if (!cancellationTokenSource.IsCancellationRequested)
- {
- cancellationTokenSource.Cancel();
- }
- cancellationTokenSource.Dispose();
- current = default(TSource);
- state = State.Disposed;
- }
- private void Cancel()
- {
- if (!cancellationTokenSource.IsCancellationRequested)
- {
- cancellationTokenSource.Cancel();
- }
- Dispose();
- Debug.WriteLine("Canceled");
- }
- public TSource Current => current;
- public async Task<bool> MoveNext(CancellationToken cancellationToken)
- {
- // using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cancellationTokenSource.Token))
- using (cancellationToken.Register(Cancel))
- {
- try
- {
- var result = await MoveNextCore(cancellationTokenSource.Token).ConfigureAwait(false);
- // cts.Dispose();
- //if (cts.IsCancellationRequested)
- //{
- // Dispose();
- //}
- return result;
- }
- catch
- {
- Dispose();
- throw;
- }
- }
- }
- public abstract Task<bool> MoveNextCore(CancellationToken cancellationToken);
- public virtual IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, TResult> selector)
- {
- return new SelectEnumerableAsyncIterator<TSource, TResult>(this, selector);
- }
- public virtual IAsyncEnumerable<TSource> Where(Func<TSource, bool> predicate)
- {
- return new WhereEnumerableAsyncIterator<TSource>(this, predicate);
- }
- }
- }
- }
|