1
0

AsyncEnumerableHelpers.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Collections.Generic
  8. {
  9. // Based on https://github.com/dotnet/corefx/blob/ec2685715b01d12f16b08d0dfa326649b12db8ec/src/Common/src/System/Collections/Generic/EnumerableHelpers.cs
  10. internal static class AsyncEnumerableHelpers
  11. {
  12. internal static async ValueTask<T[]> ToArray<T>(IAsyncEnumerable<T> source, CancellationToken cancellationToken)
  13. {
  14. var result = await ToArrayWithLength(source, cancellationToken).ConfigureAwait(false);
  15. Array.Resize(ref result.Array, result.Length);
  16. return result.Array;
  17. }
  18. internal static async ValueTask<ArrayWithLength<T>> ToArrayWithLength<T>(IAsyncEnumerable<T> source, CancellationToken cancellationToken)
  19. {
  20. cancellationToken.ThrowIfCancellationRequested();
  21. var result = new ArrayWithLength<T>();
  22. // Check for short circuit optimizations. This one is very unlikely
  23. // but could be here as a group
  24. if (source is ICollection<T> ic)
  25. {
  26. var count = ic.Count;
  27. if (count != 0)
  28. {
  29. // Allocate an array of the desired size, then copy the elements into it. Note that this has the same
  30. // issue regarding concurrency as other existing collections like List<T>. If the collection size
  31. // concurrently changes between the array allocation and the CopyTo, we could end up either getting an
  32. // exception from overrunning the array (if the size went up) or we could end up not filling as many
  33. // items as 'count' suggests (if the size went down). This is only an issue for concurrent collections
  34. // that implement ICollection<T>, which as of .NET 4.6 is just ConcurrentDictionary<TKey, TValue>.
  35. result.Array = new T[count];
  36. ic.CopyTo(result.Array, 0);
  37. result.Length = count;
  38. return result;
  39. }
  40. }
  41. else
  42. {
  43. var en = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  44. try // REVIEW: Can use `await using` if we get pattern bind (HAS_AWAIT_USING_PATTERN_BIND)
  45. {
  46. if (await en.MoveNextAsync())
  47. {
  48. const int DefaultCapacity = 4;
  49. var arr = new T[DefaultCapacity];
  50. arr[0] = en.Current;
  51. var count = 1;
  52. while (await en.MoveNextAsync())
  53. {
  54. if (count == arr.Length)
  55. {
  56. // MaxArrayLength is defined in Array.MaxArrayLength and in gchelpers in CoreCLR.
  57. // It represents the maximum number of elements that can be in an array where
  58. // the size of the element is greater than one byte; a separate, slightly larger constant,
  59. // is used when the size of the element is one.
  60. const int MaxArrayLength = 0x7FEFFFFF;
  61. // This is the same growth logic as in List<T>:
  62. // If the array is currently empty, we make it a default size. Otherwise, we attempt to
  63. // double the size of the array. Doubling will overflow once the size of the array reaches
  64. // 2^30, since doubling to 2^31 is 1 larger than Int32.MaxValue. In that case, we instead
  65. // constrain the length to be MaxArrayLength (this overflow check works because of the
  66. // cast to uint). Because a slightly larger constant is used when T is one byte in size, we
  67. // could then end up in a situation where arr.Length is MaxArrayLength or slightly larger, such
  68. // that we constrain newLength to be MaxArrayLength but the needed number of elements is actually
  69. // larger than that. For that case, we then ensure that the newLength is large enough to hold
  70. // the desired capacity. This does mean that in the very rare case where we've grown to such a
  71. // large size, each new element added after MaxArrayLength will end up doing a resize.
  72. var newLength = count << 1;
  73. if ((uint)newLength > MaxArrayLength)
  74. {
  75. newLength = MaxArrayLength <= count ? count + 1 : MaxArrayLength;
  76. }
  77. Array.Resize(ref arr, newLength);
  78. }
  79. arr[count++] = en.Current;
  80. }
  81. result.Length = count;
  82. result.Array = arr;
  83. return result;
  84. }
  85. }
  86. finally
  87. {
  88. await en.DisposeAsync();
  89. }
  90. }
  91. result.Length = 0;
  92. #if NO_ARRAY_EMPTY
  93. result.Array = EmptyArray<T>.Value;
  94. #else
  95. result.Array = Array.Empty<T>();
  96. #endif
  97. return result;
  98. }
  99. internal static async Task<Set<T>> ToSet<T>(IAsyncEnumerable<T> source, IEqualityComparer<T> comparer, CancellationToken cancellationToken)
  100. {
  101. var set = new Set<T>(comparer);
  102. await foreach (T item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  103. {
  104. set.Add(item);
  105. }
  106. return set;
  107. }
  108. internal struct ArrayWithLength<T>
  109. {
  110. public T[] Array;
  111. public int Length;
  112. }
  113. }
  114. }