Repeat.cs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal static class Repeat<TResult>
  9. {
  10. internal sealed class ForeverRecursive : Producer<TResult, ForeverRecursive._>
  11. {
  12. private readonly TResult _value;
  13. private readonly IScheduler _scheduler;
  14. public ForeverRecursive(TResult value, IScheduler scheduler)
  15. {
  16. _value = value;
  17. _scheduler = scheduler;
  18. }
  19. protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);
  20. protected override void Run(_ sink) => sink.Run(_scheduler);
  21. internal sealed class _ : IdentitySink<TResult>
  22. {
  23. private readonly TResult _value;
  24. private IDisposable _task;
  25. public _(TResult value, IObserver<TResult> observer)
  26. : base(observer)
  27. {
  28. _value = value;
  29. }
  30. public void Run(IScheduler scheduler)
  31. {
  32. var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
  33. Disposable.TrySetSingle(ref _task, first);
  34. }
  35. protected override void Dispose(bool disposing)
  36. {
  37. base.Dispose(disposing);
  38. if (disposing)
  39. {
  40. Disposable.TryDispose(ref _task);
  41. }
  42. }
  43. private IDisposable LoopRecInf(IScheduler scheduler)
  44. {
  45. ForwardOnNext(_value);
  46. var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
  47. Disposable.TrySetMultiple(ref _task, next);
  48. return Disposable.Empty;
  49. }
  50. }
  51. }
  52. internal sealed class ForeverLongRunning : Producer<TResult, ForeverLongRunning._>
  53. {
  54. private readonly TResult _value;
  55. private readonly ISchedulerLongRunning _scheduler;
  56. public ForeverLongRunning(TResult value, ISchedulerLongRunning scheduler)
  57. {
  58. _value = value;
  59. _scheduler = scheduler;
  60. }
  61. protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);
  62. protected override void Run(_ sink) => sink.Run(_scheduler);
  63. internal sealed class _ : IdentitySink<TResult>
  64. {
  65. private readonly TResult _value;
  66. public _(TResult value, IObserver<TResult> observer)
  67. : base(observer)
  68. {
  69. _value = value;
  70. }
  71. public void Run(ISchedulerLongRunning longRunning)
  72. {
  73. SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.LoopInf(c)));
  74. }
  75. private void LoopInf(ICancelable cancel)
  76. {
  77. var value = _value;
  78. while (!cancel.IsDisposed)
  79. {
  80. ForwardOnNext(value);
  81. }
  82. Dispose();
  83. }
  84. }
  85. }
  86. internal sealed class CountRecursive : Producer<TResult, CountRecursive._>
  87. {
  88. private readonly TResult _value;
  89. private readonly IScheduler _scheduler;
  90. private readonly int _repeatCount;
  91. public CountRecursive(TResult value, int repeatCount, IScheduler scheduler)
  92. {
  93. _value = value;
  94. _scheduler = scheduler;
  95. _repeatCount = repeatCount;
  96. }
  97. protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, _repeatCount, observer);
  98. protected override void Run(_ sink) => sink.Run(_scheduler);
  99. internal sealed class _ : IdentitySink<TResult>
  100. {
  101. private readonly TResult _value;
  102. private int _remaining;
  103. private IDisposable _task;
  104. public _(TResult value, int repeatCount, IObserver<TResult> observer)
  105. : base(observer)
  106. {
  107. _value = value;
  108. _remaining = repeatCount;
  109. }
  110. public void Run(IScheduler scheduler)
  111. {
  112. var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
  113. Disposable.TrySetSingle(ref _task, first);
  114. }
  115. protected override void Dispose(bool disposing)
  116. {
  117. base.Dispose(disposing);
  118. if (disposing)
  119. {
  120. Disposable.TryDispose(ref _task);
  121. }
  122. }
  123. private IDisposable LoopRec(IScheduler scheduler)
  124. {
  125. var remaining = _remaining;
  126. if (remaining > 0)
  127. {
  128. ForwardOnNext(_value);
  129. _remaining = --remaining;
  130. }
  131. if (remaining == 0)
  132. {
  133. ForwardOnCompleted();
  134. }
  135. else
  136. {
  137. var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
  138. Disposable.TrySetMultiple(ref _task, next);
  139. }
  140. return Disposable.Empty;
  141. }
  142. }
  143. }
  144. internal sealed class CountLongRunning : Producer<TResult, CountLongRunning._>
  145. {
  146. private readonly TResult _value;
  147. private readonly ISchedulerLongRunning _scheduler;
  148. private readonly int _repeatCount;
  149. public CountLongRunning(TResult value, int repeatCount, ISchedulerLongRunning scheduler)
  150. {
  151. _value = value;
  152. _scheduler = scheduler;
  153. _repeatCount = repeatCount;
  154. }
  155. protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, _repeatCount, observer);
  156. protected override void Run(_ sink) => sink.Run(_scheduler);
  157. internal sealed class _ : IdentitySink<TResult>
  158. {
  159. private readonly TResult _value;
  160. private readonly int _remaining;
  161. public _(TResult value, int remaining, IObserver<TResult> observer)
  162. : base(observer)
  163. {
  164. _value = value;
  165. _remaining = remaining;
  166. }
  167. public void Run(ISchedulerLongRunning longRunning)
  168. {
  169. SetUpstream(longRunning.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel)));
  170. }
  171. private void Loop(ICancelable cancel)
  172. {
  173. var value = _value;
  174. var n = _remaining;
  175. while (n > 0 && !cancel.IsDisposed)
  176. {
  177. ForwardOnNext(value);
  178. n--;
  179. }
  180. if (!cancel.IsDisposed)
  181. {
  182. ForwardOnCompleted();
  183. }
  184. }
  185. }
  186. }
  187. }
  188. }