1
0

AsyncEnumerable.cs 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<T> CreateEnumerable<T>(Func<CancellationToken, IAsyncEnumerator<T>> getEnumerator)
  13. {
  14. if (getEnumerator == null)
  15. throw Error.ArgumentNull(nameof(getEnumerator));
  16. return new AnonymousAsyncEnumerable<T>(getEnumerator);
  17. }
  18. public static WithCancellationAsyncEnumerable<T> WithCancellation<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
  19. {
  20. if (source == null)
  21. throw Error.ArgumentNull(nameof(source));
  22. return new WithCancellationAsyncEnumerable<T>(source, cancellationToken);
  23. }
  24. private sealed class AnonymousAsyncEnumerable<T> : IAsyncEnumerable<T>
  25. {
  26. private readonly Func<CancellationToken, IAsyncEnumerator<T>> _getEnumerator;
  27. public AnonymousAsyncEnumerable(Func<CancellationToken, IAsyncEnumerator<T>> getEnumerator)
  28. {
  29. Debug.Assert(getEnumerator != null);
  30. _getEnumerator = getEnumerator;
  31. }
  32. public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken) => _getEnumerator(cancellationToken);
  33. }
  34. // REVIEW: Explicit implementation of the interfaces allows for composition with other "modifier operators" such as ConfigureAwait.
  35. // We expect that the "await foreach" statement will bind to the public struct methods, thus avoiding boxing.
  36. public readonly struct WithCancellationAsyncEnumerable<T> : IAsyncEnumerable<T>
  37. {
  38. private readonly IAsyncEnumerable<T> _source;
  39. private readonly CancellationToken _cancellationToken;
  40. public WithCancellationAsyncEnumerable(IAsyncEnumerable<T> source, CancellationToken cancellationToken)
  41. {
  42. _source = source;
  43. _cancellationToken = cancellationToken;
  44. }
  45. // REVIEW: Should we simply ignore the second cancellation token or should we link the two?
  46. public WithCancellationAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken)
  47. => new WithCancellationAsyncEnumerator(_source.GetAsyncEnumerator(_cancellationToken));
  48. IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(CancellationToken cancellationToken)
  49. => GetAsyncEnumerator(cancellationToken);
  50. public readonly struct WithCancellationAsyncEnumerator : IAsyncEnumerator<T>
  51. {
  52. private readonly IAsyncEnumerator<T> _enumerator;
  53. public WithCancellationAsyncEnumerator(IAsyncEnumerator<T> enumerator)
  54. {
  55. _enumerator = enumerator;
  56. }
  57. public T Current => _enumerator.Current;
  58. public ValueTask DisposeAsync() => _enumerator.DisposeAsync();
  59. public ValueTask<bool> MoveNextAsync() => _enumerator.MoveNextAsync();
  60. }
  61. }
  62. }
  63. }