// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.ComponentModel; namespace System.Reactive.Concurrency { /// /// Represents an object that schedules units of work on the current thread. /// /// Singleton instance of this type exposed through this static property. public sealed class CurrentThreadScheduler : LocalScheduler { private static readonly Lazy s_instance = new Lazy(() => new CurrentThreadScheduler()); private CurrentThreadScheduler() { } /// /// Gets the singleton instance of the current thread scheduler. /// public static CurrentThreadScheduler Instance => s_instance.Value; [ThreadStatic] private static SchedulerQueue s_threadLocalQueue; [ThreadStatic] private static IStopwatch s_clock; [ThreadStatic] private static bool running; private static SchedulerQueue GetQueue() => s_threadLocalQueue; private static void SetQueue(SchedulerQueue newQueue) { s_threadLocalQueue = newQueue; } private static TimeSpan Time { get { if (s_clock == null) { s_clock = ConcurrencyAbstractionLayer.Current.StartStopwatch(); } return s_clock.Elapsed; } } /// /// Gets a value that indicates whether the caller must call a Schedule method. /// [Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Justification = "Now marked as obsolete.")] [EditorBrowsable(EditorBrowsableState.Never)] [Obsolete(Constants_Core.OBSOLETE_SCHEDULEREQUIRED)] // Preferring static method call over instance method call. public bool ScheduleRequired => IsScheduleRequired; /// /// Gets a value that indicates whether the caller must call a Schedule method. /// [EditorBrowsable(EditorBrowsableState.Advanced)] public static bool IsScheduleRequired => !running; /// /// Schedules an action to be executed after dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } var queue = default(SchedulerQueue); // There is no timed task and no task is currently running if (!running) { running = true; if (dueTime > TimeSpan.Zero) { ConcurrencyAbstractionLayer.Current.Sleep(dueTime); } // execute directly without queueing IDisposable d; try { d = action(this, state); } catch { SetQueue(null); running = false; throw; } // did recursive tasks arrive? queue = GetQueue(); // yes, run those in the queue as well if (queue != null) { try { Trampoline.Run(queue); } finally { SetQueue(null); running = false; } } else { running = false; } return d; } queue = GetQueue(); // if there is a task running or there is a queue if (queue == null) { queue = new SchedulerQueue(4); SetQueue(queue); } var dt = Time + Scheduler.Normalize(dueTime); // queue up more work var si = new ScheduledItem(this, state, action, dt); queue.Enqueue(si); return si; } private static class Trampoline { public static void Run(SchedulerQueue queue) { while (queue.Count > 0) { var item = queue.Dequeue(); if (!item.IsCanceled) { var wait = item.DueTime - Time; if (wait.Ticks > 0) { ConcurrencyAbstractionLayer.Current.Sleep(wait); } if (!item.IsCanceled) { item.Invoke(); } } } } } } }