| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Collections.Generic;using System.Threading;using System.Threading.Tasks;namespace System.Linq{    public static partial class AsyncEnumerable    {        internal abstract class AsyncIterator<TSource> : IAsyncEnumerable<TSource>, IAsyncEnumerator<TSource>        {            private readonly int threadId;            private CancellationTokenSource cancellationTokenSource;            private bool currentIsInvalid = true;            internal TSource current;            internal AsyncIteratorState state = AsyncIteratorState.New;            protected AsyncIterator()            {                threadId = Environment.CurrentManagedThreadId;            }            public IAsyncEnumerator<TSource> GetAsyncEnumerator()            {                var enumerator = state == AsyncIteratorState.New && threadId == Environment.CurrentManagedThreadId ?                    this :                    Clone();                enumerator.state = AsyncIteratorState.Allocated;                enumerator.cancellationTokenSource = new CancellationTokenSource();                try                {                    enumerator.OnGetEnumerator();                }                catch                {                    enumerator.DisposeAsync(); // REVIEW: fire-and-forget?                    throw;                }                return enumerator;            }            public virtual Task DisposeAsync()            {                if (cancellationTokenSource != null)                {                    if (!cancellationTokenSource.IsCancellationRequested)                    {                        cancellationTokenSource.Cancel();                    }                    cancellationTokenSource.Dispose();                }                current = default(TSource);                state = AsyncIteratorState.Disposed;                return TaskExt.CompletedTask;            }            public TSource Current            {                get                {                    if (currentIsInvalid)                        throw new InvalidOperationException("Enumerator is in an invalid state");                    return current;                }            }            public async Task<bool> MoveNextAsync()            {                // Note: MoveNext *must* be implemented as an async method to ensure                // that any exceptions thrown from the MoveNextCore call are handled                 // by the try/catch, whether they're sync or async                if (state == AsyncIteratorState.Disposed)                {                    return false;                }                try                {                    var result = await MoveNextCore().ConfigureAwait(false);                    currentIsInvalid = !result; // if move next is false, invalid otherwise valid                    return result;                }                catch                {                    currentIsInvalid = true;                    await DisposeAsync().ConfigureAwait(false);                    throw;                }            }            public abstract AsyncIterator<TSource> Clone();            public virtual IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, TResult> selector)            {                return new SelectEnumerableAsyncIterator<TSource, TResult>(this, selector);            }            public virtual IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, Task<TResult>> selector)            {                return new SelectEnumerableAsyncIteratorWithTask<TSource, TResult>(this, selector);            }            public virtual IAsyncEnumerable<TSource> Where(Func<TSource, bool> predicate)            {                return new WhereEnumerableAsyncIterator<TSource>(this, predicate);            }            public virtual IAsyncEnumerable<TSource> Where(Func<TSource, Task<bool>> predicate)            {                return new WhereEnumerableAsyncIteratorWithTask<TSource>(this, predicate);            }            protected abstract Task<bool> MoveNextCore();            protected virtual void OnGetEnumerator()            {            }        }        internal enum AsyncIteratorState        {            New = 0,            Allocated = 1,            Iterating = 2,            Disposed = -1        }    }}
 |