// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
using System.ComponentModel;
namespace System.Reactive.Concurrency
{
///
/// Represents an object that schedules units of work on the current thread.
///
/// Singleton instance of this type exposed through this static property.
public sealed class CurrentThreadScheduler : LocalScheduler
{
private static readonly Lazy s_instance = new Lazy(() => new CurrentThreadScheduler());
private CurrentThreadScheduler()
{
}
///
/// Gets the singleton instance of the current thread scheduler.
///
public static CurrentThreadScheduler Instance => s_instance.Value;
[ThreadStatic]
private static SchedulerQueue s_threadLocalQueue;
[ThreadStatic]
private static IStopwatch s_clock;
[ThreadStatic]
private static bool running;
private static SchedulerQueue GetQueue() => s_threadLocalQueue;
private static void SetQueue(SchedulerQueue newQueue)
{
s_threadLocalQueue = newQueue;
}
private static TimeSpan Time
{
get
{
if (s_clock == null)
{
s_clock = ConcurrencyAbstractionLayer.Current.StartStopwatch();
}
return s_clock.Elapsed;
}
}
///
/// Gets a value that indicates whether the caller must call a Schedule method.
///
[Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Justification = "Now marked as obsolete.")]
[EditorBrowsable(EditorBrowsableState.Never)]
[Obsolete(Constants_Core.OBSOLETE_SCHEDULEREQUIRED)] // Preferring static method call over instance method call.
public bool ScheduleRequired => IsScheduleRequired;
///
/// Gets a value that indicates whether the caller must call a Schedule method.
///
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static bool IsScheduleRequired => !running;
///
/// Schedules an action to be executed after dueTime.
///
/// The type of the state passed to the scheduled action.
/// State passed to the action to be executed.
/// Action to be executed.
/// Relative time after which to execute the action.
/// The disposable object used to cancel the scheduled action (best effort).
/// is null.
public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action)
{
if (action == null)
{
throw new ArgumentNullException(nameof(action));
}
var queue = default(SchedulerQueue);
// There is no timed task and no task is currently running
if (!running)
{
running = true;
if (dueTime > TimeSpan.Zero)
{
ConcurrencyAbstractionLayer.Current.Sleep(dueTime);
}
// execute directly without queueing
IDisposable d;
try
{
d = action(this, state);
}
catch
{
SetQueue(null);
running = false;
throw;
}
// did recursive tasks arrive?
queue = GetQueue();
// yes, run those in the queue as well
if (queue != null)
{
try
{
Trampoline.Run(queue);
}
finally
{
SetQueue(null);
running = false;
}
}
else
{
running = false;
}
return d;
}
queue = GetQueue();
// if there is a task running or there is a queue
if (queue == null)
{
queue = new SchedulerQueue(4);
SetQueue(queue);
}
var dt = Time + Scheduler.Normalize(dueTime);
// queue up more work
var si = new ScheduledItem(this, state, action, dt);
queue.Enqueue(si);
return si;
}
private static class Trampoline
{
public static void Run(SchedulerQueue queue)
{
while (queue.Count > 0)
{
var item = queue.Dequeue();
if (!item.IsCanceled)
{
var wait = item.DueTime - Time;
if (wait.Ticks > 0)
{
ConcurrencyAbstractionLayer.Current.Sleep(wait);
}
if (!item.IsCanceled)
{
item.Invoke();
}
}
}
}
}
}
}