// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.ComponentModel; using System.Threading; using System.Reactive.Disposables; namespace System.Reactive.Concurrency { /// /// Represents an object that schedules units of work on the current thread. /// /// Singleton instance of this type exposed through this static property. public sealed class CurrentThreadScheduler : LocalScheduler { private static readonly Lazy s_instance = new Lazy(() => new CurrentThreadScheduler()); CurrentThreadScheduler() { } /// /// Gets the singleton instance of the current thread scheduler. /// public static CurrentThreadScheduler Instance { get { return s_instance.Value; } } #if !NO_TLS [ThreadStatic] static SchedulerQueue s_threadLocalQueue; [ThreadStatic] static IStopwatch s_clock; private static SchedulerQueue GetQueue() { return s_threadLocalQueue; } private static void SetQueue(SchedulerQueue newQueue) { s_threadLocalQueue = newQueue; } private static TimeSpan Time { get { if (s_clock == null) s_clock = ConcurrencyAbstractionLayer.Current.StartStopwatch(); return s_clock.Elapsed; } } #else private static readonly System.Collections.Generic.Dictionary> s_queues = new System.Collections.Generic.Dictionary>(); private static readonly System.Collections.Generic.Dictionary s_clocks = new System.Collections.Generic.Dictionary(); private static SchedulerQueue GetQueue() { lock (s_queues) { var item = default(SchedulerQueue); if (s_queues.TryGetValue(Thread.CurrentThread.ManagedThreadId, out item)) return item; return null; } } private static void SetQueue(SchedulerQueue newQueue) { lock (s_queues) { if (newQueue == null) s_queues.Remove(Thread.CurrentThread.ManagedThreadId); else s_queues[Thread.CurrentThread.ManagedThreadId] = newQueue; } } private static TimeSpan Time { get { lock (s_clocks) { var clock = default(IStopwatch); if (!s_clocks.TryGetValue(Thread.CurrentThread.ManagedThreadId, out clock)) s_clocks[Thread.CurrentThread.ManagedThreadId] = clock = ConcurrencyAbstractionLayer.Current.StartStopwatch(); return clock.Elapsed; } } } #endif /// /// Gets a value that indicates whether the caller must call a Schedule method. /// [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Justification = "Now marked as obsolete.")] [EditorBrowsable(EditorBrowsableState.Never)] [Obsolete(Constants_Core.OBSOLETE_SCHEDULEREQUIRED)] // Preferring static method call over instance method call. public bool ScheduleRequired { get { return IsScheduleRequired; } } /// /// Gets a value that indicates whether the caller must call a Schedule method. /// [EditorBrowsable(EditorBrowsableState.Advanced)] public static bool IsScheduleRequired { get { return GetQueue() == null; } } /// /// Schedules an action to be executed after dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) throw new ArgumentNullException(nameof(action)); var dt = Time + Scheduler.Normalize(dueTime); var si = new ScheduledItem(this, state, action, dt); var queue = GetQueue(); if (queue == null) { queue = new SchedulerQueue(4); queue.Enqueue(si); CurrentThreadScheduler.SetQueue(queue); try { Trampoline.Run(queue); } finally { CurrentThreadScheduler.SetQueue(null); } } else { queue.Enqueue(si); } return Disposable.Create(si.Cancel); } static class Trampoline { public static void Run(SchedulerQueue queue) { while (queue.Count > 0) { var item = queue.Dequeue(); if (!item.IsCanceled) { var wait = item.DueTime - CurrentThreadScheduler.Time; if (wait.Ticks > 0) { ConcurrencyAbstractionLayer.Current.Sleep(wait); } if (!item.IsCanceled) item.Invoke(); } } } } } }