Repeat.cs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Threading.Tasks;
  9. namespace System.Reactive.Linq
  10. {
  11. public partial class AsyncObservable
  12. {
  13. public static IAsyncObservable<TSource> Repeat<TSource>(TSource value)
  14. {
  15. return Create<TSource>(observer => AsyncObserver.Repeat(observer, value));
  16. }
  17. public static IAsyncObservable<TSource> Repeat<TSource>(TSource value, IAsyncScheduler scheduler)
  18. {
  19. if (scheduler == null)
  20. throw new ArgumentNullException(nameof(scheduler));
  21. return Create<TSource>(observer => AsyncObserver.Repeat(observer, value, scheduler));
  22. }
  23. public static IAsyncObservable<TSource> Repeat<TSource>(TSource value, int repeatCount)
  24. {
  25. if (repeatCount < 0)
  26. throw new ArgumentNullException(nameof(repeatCount));
  27. return Create<TSource>(observer => AsyncObserver.Repeat(observer, value, repeatCount));
  28. }
  29. public static IAsyncObservable<TSource> Repeat<TSource>(TSource value, int repeatCount, IAsyncScheduler scheduler)
  30. {
  31. if (repeatCount < 0)
  32. throw new ArgumentNullException(nameof(repeatCount));
  33. if (scheduler == null)
  34. throw new ArgumentNullException(nameof(scheduler));
  35. return Create<TSource>(observer => AsyncObserver.Repeat(observer, value, repeatCount, scheduler));
  36. }
  37. public static IAsyncObservable<TSource> Repeat<TSource>(this IAsyncObservable<TSource> source)
  38. {
  39. if (source == null)
  40. throw new ArgumentNullException(nameof(source));
  41. return Create(source, static (source, observer) => AsyncObserver.Repeat(observer, source));
  42. }
  43. public static IAsyncObservable<TSource> Repeat<TSource>(this IAsyncObservable<TSource> source, int repeatCount)
  44. {
  45. if (source == null)
  46. throw new ArgumentNullException(nameof(source));
  47. if (repeatCount < 0)
  48. throw new ArgumentNullException(nameof(repeatCount));
  49. return CreateAsyncObservable<TSource>.From(
  50. source,
  51. repeatCount,
  52. static (source, repeatCount, observer) => AsyncObserver.Repeat(observer, source, repeatCount));
  53. }
  54. }
  55. public partial class AsyncObserver
  56. {
  57. public static ValueTask<IAsyncDisposable> Repeat<TSource>(IAsyncObserver<TSource> observer, TSource value)
  58. {
  59. if (observer == null)
  60. throw new ArgumentNullException(nameof(observer));
  61. return Repeat(observer, value, TaskPoolAsyncScheduler.Default);
  62. }
  63. public static ValueTask<IAsyncDisposable> Repeat<TSource>(IAsyncObserver<TSource> observer, TSource value, IAsyncScheduler scheduler)
  64. {
  65. if (observer == null)
  66. throw new ArgumentNullException(nameof(observer));
  67. if (scheduler == null)
  68. throw new ArgumentNullException(nameof(scheduler));
  69. return scheduler.ScheduleAsync(async ct =>
  70. {
  71. while (!ct.IsCancellationRequested)
  72. {
  73. await observer.OnNextAsync(value).RendezVous(scheduler, ct);
  74. }
  75. });
  76. }
  77. public static ValueTask<IAsyncDisposable> Repeat<TSource>(IAsyncObserver<TSource> observer, TSource value, int repeatCount)
  78. {
  79. if (observer == null)
  80. throw new ArgumentNullException(nameof(observer));
  81. if (repeatCount < 0)
  82. throw new ArgumentNullException(nameof(repeatCount));
  83. return Repeat(observer, value, repeatCount, TaskPoolAsyncScheduler.Default);
  84. }
  85. public static ValueTask<IAsyncDisposable> Repeat<TSource>(IAsyncObserver<TSource> observer, TSource value, int repeatCount, IAsyncScheduler scheduler)
  86. {
  87. if (observer == null)
  88. throw new ArgumentNullException(nameof(observer));
  89. if (repeatCount < 0)
  90. throw new ArgumentNullException(nameof(repeatCount));
  91. if (scheduler == null)
  92. throw new ArgumentNullException(nameof(scheduler));
  93. return scheduler.ScheduleAsync(async ct =>
  94. {
  95. var i = 0;
  96. while (!ct.IsCancellationRequested && i < repeatCount)
  97. {
  98. await observer.OnNextAsync(value).RendezVous(scheduler, ct);
  99. i++;
  100. }
  101. if (i == repeatCount)
  102. {
  103. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  104. }
  105. });
  106. }
  107. public static ValueTask<IAsyncDisposable> Repeat<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> source)
  108. {
  109. if (observer == null)
  110. throw new ArgumentNullException(nameof(observer));
  111. if (source == null)
  112. throw new ArgumentNullException(nameof(source));
  113. async ValueTask<IAsyncDisposable> CoreAsync()
  114. {
  115. var (sink, inner) = Concat(observer, Repeat(source).GetEnumerator());
  116. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  117. return StableCompositeAsyncDisposable.Create(subscription, inner);
  118. }
  119. return CoreAsync();
  120. }
  121. public static ValueTask<IAsyncDisposable> Repeat<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> source, int repeatCount)
  122. {
  123. if (observer == null)
  124. throw new ArgumentNullException(nameof(observer));
  125. if (source == null)
  126. throw new ArgumentNullException(nameof(source));
  127. if (repeatCount < 0)
  128. throw new ArgumentNullException(nameof(repeatCount));
  129. async ValueTask<IAsyncDisposable> CoreAsync()
  130. {
  131. var (sink, inner) = Concat(observer, Enumerable.Repeat(source, repeatCount).GetEnumerator());
  132. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  133. return StableCompositeAsyncDisposable.Create(subscription, inner);
  134. }
  135. return CoreAsync();
  136. }
  137. private static IEnumerable<T> Repeat<T>(T value)
  138. {
  139. while (true)
  140. {
  141. yield return value;
  142. }
  143. }
  144. }
  145. }