Program.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Reactive;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Linq;
  6. using System.Reactive.Subjects;
  7. namespace HistoricalScheduling
  8. {
  9. class Program
  10. {
  11. static void Main(string[] args)
  12. {
  13. var log = new LogScheduler<int>(GetLog());
  14. log.Process((xs, s) =>
  15. {
  16. var res0 = xs.Timestamp(s);
  17. res0.Subscribe(t => Console.WriteLine("0> " + t));
  18. var res1 = xs.Where(x => x % 2 != 0).Timestamp(s);
  19. res1.Subscribe(t => Console.WriteLine("1> " + t));
  20. var res2 = xs.Buffer(TimeSpan.FromDays(63), s).Select(b => b.Count).Timestamp(s);
  21. res2.Subscribe(t => Console.WriteLine("2> " + t));
  22. var res3 = Observable.Interval(TimeSpan.FromDays(1), s).TakeUntil(new DateTimeOffset(2013, 1, 1, 12, 0, 0, TimeSpan.Zero), s).Select(_ => s.Clock);
  23. res3.Subscribe(t =>
  24. {
  25. Console.ForegroundColor = ConsoleColor.Green;
  26. Console.WriteLine("It's now " + t);
  27. Console.ResetColor();
  28. });
  29. //
  30. // If the end of the log should cause the scheduler to stop, add the following line:
  31. //
  32. // xs.Subscribe(_ => { }, s.Stop);
  33. });
  34. }
  35. static IEnumerable<Timestamped<int>> GetLog()
  36. {
  37. for (int i = 1; i <= 12; i++)
  38. {
  39. var date = new DateTimeOffset(2012, i, 1, 12, 0, 0, TimeSpan.Zero);
  40. var value = i * i;
  41. Console.ForegroundColor = ConsoleColor.Yellow;
  42. Console.WriteLine("Log read for {0} - Value = {1}", date, value);
  43. Console.ResetColor();
  44. yield return new Timestamped<int>(i * i, date);
  45. }
  46. }
  47. }
  48. class LogScheduler<T>
  49. {
  50. private readonly IEnumerable<Timestamped<T>> _source;
  51. public LogScheduler(IEnumerable<Timestamped<T>> source)
  52. {
  53. _source = source;
  54. }
  55. public void Process(Action<IObservable<T>, HistoricalScheduler> query)
  56. {
  57. var enumerator = _source.GetEnumerator();
  58. var scheduler = new Scheduler(enumerator);
  59. query(scheduler.Source, scheduler);
  60. scheduler.Start();
  61. }
  62. class Scheduler : HistoricalScheduler
  63. {
  64. private readonly Subject<T> _subject = new Subject<T>();
  65. private readonly IEnumerator<Timestamped<T>> _enumerator;
  66. public Scheduler(IEnumerator<Timestamped<T>> enumerator)
  67. {
  68. _enumerator = enumerator;
  69. MoveNext(true);
  70. }
  71. public Scheduler(IEnumerator<Timestamped<T>> enumerator, DateTimeOffset startTime)
  72. {
  73. _enumerator = enumerator;
  74. this.AdvanceTo(startTime);
  75. MoveNext();
  76. }
  77. public void MoveNext(bool initializeInitialTimeFromLog = false)
  78. {
  79. var nextLog = default(Timestamped<T>);
  80. if (TryMoveNext(out nextLog))
  81. {
  82. if (initializeInitialTimeFromLog)
  83. this.AdvanceTo(nextLog.Timestamp);
  84. ScheduleOnNext(nextLog);
  85. }
  86. else
  87. {
  88. this.Schedule(_subject.OnCompleted);
  89. }
  90. }
  91. public IObservable<T> Source
  92. {
  93. get { return _subject.AsObservable(); }
  94. }
  95. private bool TryMoveNext(out Timestamped<T> value)
  96. {
  97. try
  98. {
  99. if (_enumerator.MoveNext())
  100. {
  101. value = _enumerator.Current;
  102. return true;
  103. }
  104. }
  105. catch
  106. {
  107. _enumerator.Dispose();
  108. throw;
  109. }
  110. _enumerator.Dispose();
  111. value = default(Timestamped<T>);
  112. return false;
  113. }
  114. private void ScheduleOnNext(Timestamped<T> value)
  115. {
  116. this.Schedule(value.Timestamp, () =>
  117. {
  118. _subject.OnNext(value.Value);
  119. MoveNext();
  120. });
  121. }
  122. }
  123. }
  124. }