Repeat.cs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Linq
  10. {
  11. public static partial class AsyncEnumerable
  12. {
  13. public static IAsyncEnumerable<TResult> Repeat<TResult>(TResult element, int count)
  14. {
  15. if (count < 0)
  16. throw new ArgumentOutOfRangeException(nameof(count));
  17. return Enumerable.Repeat(element, count)
  18. .ToAsyncEnumerable();
  19. }
  20. public static IAsyncEnumerable<TResult> Repeat<TResult>(TResult element)
  21. {
  22. return Create(() =>
  23. {
  24. return Create(
  25. ct => TaskExt.True,
  26. () => element,
  27. () => { }
  28. );
  29. });
  30. }
  31. public static IAsyncEnumerable<TSource> Repeat<TSource>(this IAsyncEnumerable<TSource> source, int count)
  32. {
  33. if (source == null)
  34. throw new ArgumentNullException(nameof(source));
  35. if (count < 0)
  36. throw new ArgumentOutOfRangeException(nameof(count));
  37. return Create(() =>
  38. {
  39. var e = default(IAsyncEnumerator<TSource>);
  40. var a = new AssignableDisposable();
  41. var n = count;
  42. var current = default(TSource);
  43. var cts = new CancellationTokenDisposable();
  44. var d = Disposable.Create(cts, a);
  45. var f = default(Func<CancellationToken, Task<bool>>);
  46. f = async ct =>
  47. {
  48. if (e == null)
  49. {
  50. if (n-- == 0)
  51. {
  52. return false;
  53. }
  54. e = source.GetEnumerator();
  55. a.Disposable = e;
  56. }
  57. if (await e.MoveNext(ct)
  58. .ConfigureAwait(false))
  59. {
  60. current = e.Current;
  61. return true;
  62. }
  63. e = null;
  64. return await f(ct)
  65. .ConfigureAwait(false);
  66. };
  67. return Create(
  68. f,
  69. () => current,
  70. d.Dispose,
  71. e
  72. );
  73. });
  74. }
  75. public static IAsyncEnumerable<TSource> Repeat<TSource>(this IAsyncEnumerable<TSource> source)
  76. {
  77. if (source == null)
  78. throw new ArgumentNullException(nameof(source));
  79. return Create(() =>
  80. {
  81. var e = default(IAsyncEnumerator<TSource>);
  82. var a = new AssignableDisposable();
  83. var current = default(TSource);
  84. var cts = new CancellationTokenDisposable();
  85. var d = Disposable.Create(cts, a);
  86. var f = default(Func<CancellationToken, Task<bool>>);
  87. f = async ct =>
  88. {
  89. if (e == null)
  90. {
  91. e = source.GetEnumerator();
  92. a.Disposable = e;
  93. }
  94. if (await e.MoveNext(ct)
  95. .ConfigureAwait(false))
  96. {
  97. current = e.Current;
  98. return true;
  99. }
  100. e = null;
  101. return await f(ct)
  102. .ConfigureAwait(false);
  103. };
  104. return Create(
  105. f,
  106. () => current,
  107. d.Dispose,
  108. e
  109. );
  110. });
  111. }
  112. }
  113. }