HistoricalScheduler.cs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #nullable disable
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. namespace System.Reactive.Concurrency
  8. {
  9. /// <summary>
  10. /// Base class for historical schedulers, which are virtual time schedulers that use <see cref="DateTimeOffset"/> for absolute time and <see cref="TimeSpan"/> for relative time.
  11. /// </summary>
  12. public abstract class HistoricalSchedulerBase : VirtualTimeSchedulerBase<DateTimeOffset, TimeSpan>
  13. {
  14. /// <summary>
  15. /// Creates a new historical scheduler with the minimum value of <see cref="DateTimeOffset"/> as the initial clock value.
  16. /// </summary>
  17. protected HistoricalSchedulerBase()
  18. : base(DateTimeOffset.MinValue, Comparer<DateTimeOffset>.Default)
  19. {
  20. }
  21. /// <summary>
  22. /// Creates a new historical scheduler with the specified initial clock value.
  23. /// </summary>
  24. /// <param name="initialClock">Initial clock value.</param>
  25. protected HistoricalSchedulerBase(DateTimeOffset initialClock)
  26. : base(initialClock, Comparer<DateTimeOffset>.Default)
  27. {
  28. }
  29. /// <summary>
  30. /// Creates a new historical scheduler with the specified initial clock value and absolute time comparer.
  31. /// </summary>
  32. /// <param name="initialClock">Initial value for the clock.</param>
  33. /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param>
  34. protected HistoricalSchedulerBase(DateTimeOffset initialClock, IComparer<DateTimeOffset> comparer)
  35. : base(initialClock, comparer)
  36. {
  37. }
  38. /// <summary>
  39. /// Adds a relative time value to an absolute time value.
  40. /// </summary>
  41. /// <param name="absolute">Absolute time value.</param>
  42. /// <param name="relative">Relative time value to add.</param>
  43. /// <returns>The resulting absolute time sum value.</returns>
  44. protected override DateTimeOffset Add(DateTimeOffset absolute, TimeSpan relative)
  45. {
  46. return absolute.Add(relative);
  47. }
  48. /// <summary>
  49. /// Converts the absolute time value to a <see cref="DateTimeOffset"/> value.
  50. /// </summary>
  51. /// <param name="absolute">Absolute time value to convert.</param>
  52. /// <returns>The corresponding <see cref="DateTimeOffset"/> value.</returns>
  53. protected override DateTimeOffset ToDateTimeOffset(DateTimeOffset absolute) => absolute;
  54. /// <summary>
  55. /// Converts the <see cref="TimeSpan"/> value to a relative time value.
  56. /// </summary>
  57. /// <param name="timeSpan"><see cref="TimeSpan"/> value to convert.</param>
  58. /// <returns>The corresponding relative time value.</returns>
  59. protected override TimeSpan ToRelative(TimeSpan timeSpan) => timeSpan;
  60. }
  61. /// <summary>
  62. /// Provides a virtual time scheduler that uses <see cref="DateTimeOffset"/> for absolute time and <see cref="TimeSpan"/> for relative time.
  63. /// </summary>
  64. [DebuggerDisplay("\\{ " +
  65. nameof(Clock) + " = {" + nameof(Clock) + "} " +
  66. nameof(Now) + " = {" + nameof(Now) + ".ToString(\"O\")} " +
  67. "\\}")]
  68. public class HistoricalScheduler : HistoricalSchedulerBase
  69. {
  70. private readonly SchedulerQueue<DateTimeOffset> _queue = new SchedulerQueue<DateTimeOffset>();
  71. /// <summary>
  72. /// Creates a new historical scheduler with the minimum value of <see cref="DateTimeOffset"/> as the initial clock value.
  73. /// </summary>
  74. public HistoricalScheduler()
  75. {
  76. }
  77. /// <summary>
  78. /// Creates a new historical scheduler with the specified initial clock value.
  79. /// </summary>
  80. /// <param name="initialClock">Initial value for the clock.</param>
  81. public HistoricalScheduler(DateTimeOffset initialClock)
  82. : base(initialClock)
  83. {
  84. }
  85. /// <summary>
  86. /// Creates a new historical scheduler with the specified initial clock value.
  87. /// </summary>
  88. /// <param name="initialClock">Initial value for the clock.</param>
  89. /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param>
  90. /// <exception cref="ArgumentNullException"><paramref name="comparer"/> is <c>null</c>.</exception>
  91. public HistoricalScheduler(DateTimeOffset initialClock, IComparer<DateTimeOffset> comparer)
  92. : base(initialClock, comparer)
  93. {
  94. }
  95. /// <summary>
  96. /// Gets the next scheduled item to be executed.
  97. /// </summary>
  98. /// <returns>The next scheduled item.</returns>
  99. protected override IScheduledItem<DateTimeOffset> GetNext()
  100. {
  101. while (_queue.Count > 0)
  102. {
  103. var next = _queue.Peek();
  104. if (next.IsCanceled)
  105. {
  106. _queue.Dequeue();
  107. }
  108. else
  109. {
  110. return next;
  111. }
  112. }
  113. return null;
  114. }
  115. /// <summary>
  116. /// Schedules an action to be executed at <paramref name="dueTime"/>.
  117. /// </summary>
  118. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  119. /// <param name="state">State passed to the action to be executed.</param>
  120. /// <param name="action">Action to be executed.</param>
  121. /// <param name="dueTime">Absolute time at which to execute the action.</param>
  122. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  123. /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
  124. public override IDisposable ScheduleAbsolute<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  125. {
  126. if (action == null)
  127. {
  128. throw new ArgumentNullException(nameof(action));
  129. }
  130. ScheduledItem<DateTimeOffset, TState> si = null;
  131. var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) =>
  132. {
  133. _queue.Remove(si);
  134. return action(scheduler, state1);
  135. });
  136. si = new ScheduledItem<DateTimeOffset, TState>(this, state, run, dueTime, Comparer);
  137. _queue.Enqueue(si);
  138. return si;
  139. }
  140. }
  141. }