Concat.cs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerableEx
  10. {
  11. /// <summary>
  12. /// Concatenates all inner async-enumerable sequences, as long as the previous async-enumerable sequence terminated successfully.
  13. /// </summary>
  14. /// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
  15. /// <param name="sources">Observable sequence of inner async-enumerable sequences.</param>
  16. /// <returns>An async-enumerable sequence that contains the elements of each observed inner sequence, in sequential order.</returns>
  17. /// <exception cref="ArgumentNullException"><paramref name="sources"/> is null.</exception>
  18. public static IAsyncEnumerable<TSource> Concat<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
  19. {
  20. if (sources == null)
  21. throw Error.ArgumentNull(nameof(sources));
  22. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  23. return Core(sources);
  24. static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<IAsyncEnumerable<TSource>> sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  25. #else
  26. return AsyncEnumerable.Create(Core);
  27. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  28. #endif
  29. {
  30. await foreach (var source in sources.WithCancellation(cancellationToken).ConfigureAwait(false))
  31. {
  32. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  33. {
  34. yield return item;
  35. }
  36. }
  37. }
  38. }
  39. /// <summary>
  40. /// Concatenates all async-enumerable sequences in the given enumerable sequence, as long as the previous async-enumerable sequence terminated successfully.
  41. /// </summary>
  42. /// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
  43. /// <param name="sources">Observable sequences to concatenate.</param>
  44. /// <returns>An async-enumerable sequence that contains the elements of each given sequence, in sequential order.</returns>
  45. /// <exception cref="ArgumentNullException"><paramref name="sources"/> is null.</exception>
  46. public static IAsyncEnumerable<TSource> Concat<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  47. {
  48. if (sources == null)
  49. throw Error.ArgumentNull(nameof(sources));
  50. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  51. return Core(sources);
  52. static async IAsyncEnumerable<TSource> Core(IEnumerable<IAsyncEnumerable<TSource>> sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  53. #else
  54. return AsyncEnumerable.Create(Core);
  55. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  56. #endif
  57. {
  58. foreach (var source in sources)
  59. {
  60. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  61. {
  62. yield return item;
  63. }
  64. }
  65. }
  66. }
  67. /// <summary>
  68. /// Concatenates all of the specified async-enumerable sequences, as long as the previous async-enumerable sequence terminated successfully.
  69. /// </summary>
  70. /// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
  71. /// <param name="sources">Observable sequences to concatenate.</param>
  72. /// <returns>An async-enumerable sequence that contains the elements of each given sequence, in sequential order.</returns>
  73. /// <exception cref="ArgumentNullException"><paramref name="sources"/> is null.</exception>
  74. public static IAsyncEnumerable<TSource> Concat<TSource>(params IAsyncEnumerable<TSource>[] sources)
  75. {
  76. if (sources == null)
  77. throw Error.ArgumentNull(nameof(sources));
  78. #if HAS_ASYNC_ENUMERABLE_CANCELLATION
  79. return Core(sources);
  80. static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource>[] sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
  81. #else
  82. return AsyncEnumerable.Create(Core);
  83. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  84. #endif
  85. {
  86. foreach (var source in sources)
  87. {
  88. await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
  89. {
  90. yield return item;
  91. }
  92. }
  93. }
  94. }
  95. }
  96. }