Expand.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerableEx
  10. {
  11. /// <summary>
  12. /// Expands (breadth first) the async-enumerable sequence by recursively applying a selector function to generate more sequences at each recursion level.
  13. /// </summary>
  14. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  15. /// <param name="source">Source async-enumerable sequence.</param>
  16. /// <param name="selector">Selector function to retrieve the next sequence to expand.</param>
  17. /// <returns>Sequence with results from the recursive expansion of the source sequence.</returns>
  18. public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TSource>> selector)
  19. {
  20. if (source == null)
  21. throw Error.ArgumentNull(nameof(source));
  22. if (selector == null)
  23. throw Error.ArgumentNull(nameof(selector));
  24. return Core(source, selector);
  25. static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TSource>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  26. {
  27. var queue = new Queue<IAsyncEnumerable<TSource>>();
  28. queue.Enqueue(source);
  29. while (queue.Count > 0)
  30. {
  31. await foreach (var item in queue.Dequeue().WithCancellation(cancellationToken).ConfigureAwait(false))
  32. {
  33. queue.Enqueue(selector(item));
  34. yield return item;
  35. }
  36. }
  37. }
  38. }
  39. /// <summary>
  40. /// Expands (breadth first) the async-enumerable sequence by recursively applying an asynchronous selector function to generate more sequences at each recursion level.
  41. /// </summary>
  42. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  43. /// <param name="source">Source async-enumerable sequence.</param>
  44. /// <param name="selector">Asynchronous selector function to retrieve the next sequence to expand.</param>
  45. /// <returns>Sequence with results from the recursive expansion of the source sequence.</returns>
  46. public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TSource>>> selector)
  47. {
  48. if (source == null)
  49. throw Error.ArgumentNull(nameof(source));
  50. if (selector == null)
  51. throw Error.ArgumentNull(nameof(selector));
  52. return Core(source, selector);
  53. static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TSource>>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  54. {
  55. var queue = new Queue<IAsyncEnumerable<TSource>>();
  56. queue.Enqueue(source);
  57. while (queue.Count > 0)
  58. {
  59. await foreach (var item in queue.Dequeue().WithCancellation(cancellationToken).ConfigureAwait(false))
  60. {
  61. queue.Enqueue(await selector(item).ConfigureAwait(false));
  62. yield return item;
  63. }
  64. }
  65. }
  66. }
  67. #if !NO_DEEP_CANCELLATION
  68. /// <summary>
  69. /// Expands (breadth first) the async-enumerable sequence by recursively applying an asynchronous (cancellable) selector function to generate more sequences at each recursion level.
  70. /// </summary>
  71. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  72. /// <param name="source">Source async-enumerable sequence.</param>
  73. /// <param name="selector">Asynchronous (cancellable) selector function to retrieve the next sequence to expand.</param>
  74. /// <returns>Sequence with results from the recursive expansion of the source sequence.</returns>
  75. public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> selector)
  76. {
  77. if (source == null)
  78. throw Error.ArgumentNull(nameof(source));
  79. if (selector == null)
  80. throw Error.ArgumentNull(nameof(selector));
  81. return Core(source, selector);
  82. static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
  83. {
  84. var queue = new Queue<IAsyncEnumerable<TSource>>();
  85. queue.Enqueue(source);
  86. while (queue.Count > 0)
  87. {
  88. await foreach (var item in queue.Dequeue().WithCancellation(cancellationToken).ConfigureAwait(false))
  89. {
  90. queue.Enqueue(await selector(item, cancellationToken).ConfigureAwait(false));
  91. yield return item;
  92. }
  93. }
  94. }
  95. }
  96. #endif
  97. }
  98. }