// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
using System.ComponentModel;
using System.Threading;
using System.Reactive.Disposables;
namespace System.Reactive.Concurrency
{
///
/// Represents an object that schedules units of work on the current thread.
///
/// Singleton instance of this type exposed through this static property.
public sealed class CurrentThreadScheduler : LocalScheduler
{
private static readonly Lazy s_instance = new Lazy(() => new CurrentThreadScheduler());
CurrentThreadScheduler()
{
}
///
/// Gets the singleton instance of the current thread scheduler.
///
public static CurrentThreadScheduler Instance
{
get { return s_instance.Value; }
}
#if !NO_TLS
[ThreadStatic]
static SchedulerQueue s_threadLocalQueue;
[ThreadStatic]
static IStopwatch s_clock;
private static SchedulerQueue GetQueue()
{
return s_threadLocalQueue;
}
private static void SetQueue(SchedulerQueue newQueue)
{
s_threadLocalQueue = newQueue;
}
private static TimeSpan Time
{
get
{
if (s_clock == null)
s_clock = ConcurrencyAbstractionLayer.Current.StartStopwatch();
return s_clock.Elapsed;
}
}
#else
private static readonly System.Collections.Generic.Dictionary> s_queues = new System.Collections.Generic.Dictionary>();
private static readonly System.Collections.Generic.Dictionary s_clocks = new System.Collections.Generic.Dictionary();
private static SchedulerQueue GetQueue()
{
lock (s_queues)
{
var item = default(SchedulerQueue);
if (s_queues.TryGetValue(Thread.CurrentThread.ManagedThreadId, out item))
return item;
return null;
}
}
private static void SetQueue(SchedulerQueue newQueue)
{
lock (s_queues)
{
if (newQueue == null)
s_queues.Remove(Thread.CurrentThread.ManagedThreadId);
else
s_queues[Thread.CurrentThread.ManagedThreadId] = newQueue;
}
}
private static TimeSpan Time
{
get
{
lock (s_clocks)
{
var clock = default(IStopwatch);
if (!s_clocks.TryGetValue(Thread.CurrentThread.ManagedThreadId, out clock))
s_clocks[Thread.CurrentThread.ManagedThreadId] = clock = ConcurrencyAbstractionLayer.Current.StartStopwatch();
return clock.Elapsed;
}
}
}
#endif
///
/// Gets a value that indicates whether the caller must call a Schedule method.
///
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Justification = "Now marked as obsolete.")]
[EditorBrowsable(EditorBrowsableState.Never)]
[Obsolete(Constants_Core.OBSOLETE_SCHEDULEREQUIRED)] // Preferring static method call over instance method call.
public bool ScheduleRequired
{
get
{
return IsScheduleRequired;
}
}
///
/// Gets a value that indicates whether the caller must call a Schedule method.
///
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static bool IsScheduleRequired
{
get
{
return GetQueue() == null;
}
}
///
/// Schedules an action to be executed after dueTime.
///
/// The type of the state passed to the scheduled action.
/// State passed to the action to be executed.
/// Action to be executed.
/// Relative time after which to execute the action.
/// The disposable object used to cancel the scheduled action (best effort).
/// is null.
public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action)
{
if (action == null)
throw new ArgumentNullException(nameof(action));
var dt = Time + Scheduler.Normalize(dueTime);
var si = new ScheduledItem(this, state, action, dt);
var queue = GetQueue();
if (queue == null)
{
queue = new SchedulerQueue(4);
queue.Enqueue(si);
CurrentThreadScheduler.SetQueue(queue);
try
{
Trampoline.Run(queue);
}
finally
{
CurrentThreadScheduler.SetQueue(null);
}
}
else
{
queue.Enqueue(si);
}
return Disposable.Create(si.Cancel);
}
static class Trampoline
{
public static void Run(SchedulerQueue queue)
{
while (queue.Count > 0)
{
var item = queue.Dequeue();
if (!item.IsCanceled)
{
var wait = item.DueTime - CurrentThreadScheduler.Time;
if (wait.Ticks > 0)
{
ConcurrencyAbstractionLayer.Current.Sleep(wait);
}
if (!item.IsCanceled)
item.Invoke();
}
}
}
}
}
}