Repeat.cs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. class Repeat<TResult> : Producer<TResult>
  9. {
  10. private readonly TResult _value;
  11. private readonly int? _repeatCount;
  12. private readonly IScheduler _scheduler;
  13. public Repeat(TResult value, int? repeatCount, IScheduler scheduler)
  14. {
  15. _value = value;
  16. _repeatCount = repeatCount;
  17. _scheduler = scheduler;
  18. }
  19. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  20. {
  21. var sink = new _(this, observer, cancel);
  22. setSink(sink);
  23. return sink.Run();
  24. }
  25. class _ : Sink<TResult>
  26. {
  27. private readonly Repeat<TResult> _parent;
  28. public _(Repeat<TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  29. : base(observer, cancel)
  30. {
  31. _parent = parent;
  32. }
  33. public IDisposable Run()
  34. {
  35. var longRunning = _parent._scheduler.AsLongRunning();
  36. if (longRunning != null)
  37. {
  38. return Run(longRunning);
  39. }
  40. else
  41. {
  42. return Run(_parent._scheduler);
  43. }
  44. }
  45. private IDisposable Run(IScheduler scheduler)
  46. {
  47. if (_parent._repeatCount == null)
  48. {
  49. return scheduler.Schedule(LoopRecInf);
  50. }
  51. else
  52. {
  53. return scheduler.Schedule(_parent._repeatCount.Value, LoopRec);
  54. }
  55. }
  56. private void LoopRecInf(Action recurse)
  57. {
  58. base._observer.OnNext(_parent._value);
  59. recurse();
  60. }
  61. private void LoopRec(int n, Action<int> recurse)
  62. {
  63. if (n > 0)
  64. {
  65. base._observer.OnNext(_parent._value);
  66. n--;
  67. }
  68. if (n == 0)
  69. {
  70. base._observer.OnCompleted();
  71. base.Dispose();
  72. return;
  73. }
  74. recurse(n);
  75. }
  76. private IDisposable Run(ISchedulerLongRunning scheduler)
  77. {
  78. if (_parent._repeatCount == null)
  79. {
  80. return scheduler.ScheduleLongRunning(LoopInf);
  81. }
  82. else
  83. {
  84. return scheduler.ScheduleLongRunning(_parent._repeatCount.Value, Loop);
  85. }
  86. }
  87. private void LoopInf(ICancelable cancel)
  88. {
  89. var value = _parent._value;
  90. while (!cancel.IsDisposed)
  91. base._observer.OnNext(value);
  92. base.Dispose();
  93. }
  94. private void Loop(int n, ICancelable cancel)
  95. {
  96. var value = _parent._value;
  97. while (n > 0 && !cancel.IsDisposed)
  98. {
  99. base._observer.OnNext(value);
  100. n--;
  101. }
  102. if (!cancel.IsDisposed)
  103. base._observer.OnCompleted();
  104. base.Dispose();
  105. }
  106. }
  107. }
  108. }
  109. #endif