1
0

HistoricalScheduler.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Concurrency
  7. {
  8. /// <summary>
  9. /// Base class for historical schedulers, which are virtual time schedulers that use DateTimeOffset for absolute time and TimeSpan for relative time.
  10. /// </summary>
  11. public abstract class HistoricalSchedulerBase : VirtualTimeSchedulerBase<DateTimeOffset, TimeSpan>
  12. {
  13. /// <summary>
  14. /// Creates a new historical scheduler with the minimum value of DateTimeOffset as the initial clock value.
  15. /// </summary>
  16. protected HistoricalSchedulerBase()
  17. : base(DateTimeOffset.MinValue, Comparer<DateTimeOffset>.Default)
  18. {
  19. }
  20. /// <summary>
  21. /// Creates a new historical scheduler with the specified initial clock value.
  22. /// </summary>
  23. /// <param name="initialClock">Initial clock value.</param>
  24. protected HistoricalSchedulerBase(DateTimeOffset initialClock)
  25. : base(initialClock, Comparer<DateTimeOffset>.Default)
  26. {
  27. }
  28. /// <summary>
  29. /// Creates a new historical scheduler with the specified initial clock value and absolute time comparer.
  30. /// </summary>
  31. /// <param name="initialClock">Initial value for the clock.</param>
  32. /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param>
  33. protected HistoricalSchedulerBase(DateTimeOffset initialClock, IComparer<DateTimeOffset> comparer)
  34. : base(initialClock, comparer)
  35. {
  36. }
  37. /// <summary>
  38. /// Adds a relative time value to an absolute time value.
  39. /// </summary>
  40. /// <param name="absolute">Absolute time value.</param>
  41. /// <param name="relative">Relative time value to add.</param>
  42. /// <returns>The resulting absolute time sum value.</returns>
  43. protected override DateTimeOffset Add(DateTimeOffset absolute, TimeSpan relative)
  44. {
  45. return absolute.Add(relative);
  46. }
  47. /// <summary>
  48. /// Converts the absolute time value to a DateTimeOffset value.
  49. /// </summary>
  50. /// <param name="absolute">Absolute time value to convert.</param>
  51. /// <returns>The corresponding DateTimeOffset value.</returns>
  52. protected override DateTimeOffset ToDateTimeOffset(DateTimeOffset absolute)
  53. {
  54. return absolute;
  55. }
  56. /// <summary>
  57. /// Converts the TimeSpan value to a relative time value.
  58. /// </summary>
  59. /// <param name="timeSpan">TimeSpan value to convert.</param>
  60. /// <returns>The corresponding relative time value.</returns>
  61. protected override TimeSpan ToRelative(TimeSpan timeSpan)
  62. {
  63. return timeSpan;
  64. }
  65. }
  66. /// <summary>
  67. /// Provides a virtual time scheduler that uses DateTimeOffset for absolute time and TimeSpan for relative time.
  68. /// </summary>
  69. public class HistoricalScheduler : HistoricalSchedulerBase
  70. {
  71. private readonly SchedulerQueue<DateTimeOffset> queue = new SchedulerQueue<DateTimeOffset>();
  72. /// <summary>
  73. /// Creates a new historical scheduler with the minimum value of DateTimeOffset as the initial clock value.
  74. /// </summary>
  75. public HistoricalScheduler()
  76. : base()
  77. {
  78. }
  79. /// <summary>
  80. /// Creates a new historical scheduler with the specified initial clock value.
  81. /// </summary>
  82. /// <param name="initialClock">Initial value for the clock.</param>
  83. public HistoricalScheduler(DateTimeOffset initialClock)
  84. : base(initialClock)
  85. {
  86. }
  87. /// <summary>
  88. /// Creates a new historical scheduler with the specified initial clock value.
  89. /// </summary>
  90. /// <param name="initialClock">Initial value for the clock.</param>
  91. /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param>
  92. /// <exception cref="ArgumentNullException"><paramref name="comparer"/> is null.</exception>
  93. public HistoricalScheduler(DateTimeOffset initialClock, IComparer<DateTimeOffset> comparer)
  94. : base(initialClock, comparer)
  95. {
  96. }
  97. /// <summary>
  98. /// Gets the next scheduled item to be executed.
  99. /// </summary>
  100. /// <returns>The next scheduled item.</returns>
  101. protected override IScheduledItem<DateTimeOffset> GetNext()
  102. {
  103. while (queue.Count > 0)
  104. {
  105. var next = queue.Peek();
  106. if (next.IsCanceled)
  107. queue.Dequeue();
  108. else
  109. return next;
  110. }
  111. return null;
  112. }
  113. /// <summary>
  114. /// Schedules an action to be executed at dueTime.
  115. /// </summary>
  116. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  117. /// <param name="state">State passed to the action to be executed.</param>
  118. /// <param name="action">Action to be executed.</param>
  119. /// <param name="dueTime">Absolute time at which to execute the action.</param>
  120. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  121. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  122. public override IDisposable ScheduleAbsolute<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  123. {
  124. if (action == null)
  125. throw new ArgumentNullException(nameof(action));
  126. var si = default(ScheduledItem<DateTimeOffset, TState>);
  127. var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) =>
  128. {
  129. queue.Remove(si);
  130. return action(scheduler, state1);
  131. });
  132. si = new ScheduledItem<DateTimeOffset, TState>(this, state, run, dueTime, Comparer);
  133. queue.Enqueue(si);
  134. return Disposable.Create(si.Cancel);
  135. }
  136. }
  137. }