HistoricalScheduler.cs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. namespace System.Reactive.Concurrency
  7. {
  8. /// <summary>
  9. /// Base class for historical schedulers, which are virtual time schedulers that use <see cref="DateTimeOffset"/> for absolute time and <see cref="TimeSpan"/> for relative time.
  10. /// </summary>
  11. public abstract class HistoricalSchedulerBase : VirtualTimeSchedulerBase<DateTimeOffset, TimeSpan>
  12. {
  13. /// <summary>
  14. /// Creates a new historical scheduler with the minimum value of <see cref="DateTimeOffset"/> as the initial clock value.
  15. /// </summary>
  16. protected HistoricalSchedulerBase()
  17. : base(DateTimeOffset.MinValue, Comparer<DateTimeOffset>.Default)
  18. {
  19. }
  20. /// <summary>
  21. /// Creates a new historical scheduler with the specified initial clock value.
  22. /// </summary>
  23. /// <param name="initialClock">Initial clock value.</param>
  24. protected HistoricalSchedulerBase(DateTimeOffset initialClock)
  25. : base(initialClock, Comparer<DateTimeOffset>.Default)
  26. {
  27. }
  28. /// <summary>
  29. /// Creates a new historical scheduler with the specified initial clock value and absolute time comparer.
  30. /// </summary>
  31. /// <param name="initialClock">Initial value for the clock.</param>
  32. /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param>
  33. protected HistoricalSchedulerBase(DateTimeOffset initialClock, IComparer<DateTimeOffset> comparer)
  34. : base(initialClock, comparer)
  35. {
  36. }
  37. /// <summary>
  38. /// Adds a relative time value to an absolute time value.
  39. /// </summary>
  40. /// <param name="absolute">Absolute time value.</param>
  41. /// <param name="relative">Relative time value to add.</param>
  42. /// <returns>The resulting absolute time sum value.</returns>
  43. protected override DateTimeOffset Add(DateTimeOffset absolute, TimeSpan relative)
  44. {
  45. return absolute.Add(relative);
  46. }
  47. /// <summary>
  48. /// Converts the absolute time value to a <see cref="DateTimeOffset"/> value.
  49. /// </summary>
  50. /// <param name="absolute">Absolute time value to convert.</param>
  51. /// <returns>The corresponding <see cref="DateTimeOffset"/> value.</returns>
  52. protected override DateTimeOffset ToDateTimeOffset(DateTimeOffset absolute) => absolute;
  53. /// <summary>
  54. /// Converts the <see cref="TimeSpan"/> value to a relative time value.
  55. /// </summary>
  56. /// <param name="timeSpan"><see cref="TimeSpan"/> value to convert.</param>
  57. /// <returns>The corresponding relative time value.</returns>
  58. protected override TimeSpan ToRelative(TimeSpan timeSpan) => timeSpan;
  59. }
  60. /// <summary>
  61. /// Provides a virtual time scheduler that uses <see cref="DateTimeOffset"/> for absolute time and <see cref="TimeSpan"/> for relative time.
  62. /// </summary>
  63. [DebuggerDisplay("\\{ " +
  64. nameof(Clock) + " = {" + nameof(Clock) + "} " +
  65. nameof(Now) + " = {" + nameof(Now) + ".ToString(\"O\")} " +
  66. "\\}")]
  67. public class HistoricalScheduler : HistoricalSchedulerBase
  68. {
  69. private readonly SchedulerQueue<DateTimeOffset> _queue = new SchedulerQueue<DateTimeOffset>();
  70. /// <summary>
  71. /// Creates a new historical scheduler with the minimum value of <see cref="DateTimeOffset"/> as the initial clock value.
  72. /// </summary>
  73. public HistoricalScheduler()
  74. {
  75. }
  76. /// <summary>
  77. /// Creates a new historical scheduler with the specified initial clock value.
  78. /// </summary>
  79. /// <param name="initialClock">Initial value for the clock.</param>
  80. public HistoricalScheduler(DateTimeOffset initialClock)
  81. : base(initialClock)
  82. {
  83. }
  84. /// <summary>
  85. /// Creates a new historical scheduler with the specified initial clock value.
  86. /// </summary>
  87. /// <param name="initialClock">Initial value for the clock.</param>
  88. /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param>
  89. /// <exception cref="ArgumentNullException"><paramref name="comparer"/> is <c>null</c>.</exception>
  90. public HistoricalScheduler(DateTimeOffset initialClock, IComparer<DateTimeOffset> comparer)
  91. : base(initialClock, comparer)
  92. {
  93. }
  94. /// <summary>
  95. /// Gets the next scheduled item to be executed.
  96. /// </summary>
  97. /// <returns>The next scheduled item.</returns>
  98. protected override IScheduledItem<DateTimeOffset>? GetNext()
  99. {
  100. while (_queue.Count > 0)
  101. {
  102. var next = _queue.Peek();
  103. if (next.IsCanceled)
  104. {
  105. _queue.Dequeue();
  106. }
  107. else
  108. {
  109. return next;
  110. }
  111. }
  112. return null;
  113. }
  114. /// <summary>
  115. /// Schedules an action to be executed at <paramref name="dueTime"/>.
  116. /// </summary>
  117. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  118. /// <param name="state">State passed to the action to be executed.</param>
  119. /// <param name="action">Action to be executed.</param>
  120. /// <param name="dueTime">Absolute time at which to execute the action.</param>
  121. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  122. /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
  123. public override IDisposable ScheduleAbsolute<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  124. {
  125. if (action == null)
  126. {
  127. throw new ArgumentNullException(nameof(action));
  128. }
  129. ScheduledItem<DateTimeOffset, TState>? si = null;
  130. var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) =>
  131. {
  132. _queue.Remove(si!); // NB: Assigned before function is invoked.
  133. return action(scheduler, state1);
  134. });
  135. si = new ScheduledItem<DateTimeOffset, TState>(this, state, run, dueTime, Comparer);
  136. _queue.Enqueue(si);
  137. return si;
  138. }
  139. }
  140. }