// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. #if !WINDOWS && !NO_THREAD using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { /// /// Represents an object that schedules units of work on the CLR thread pool. /// /// Singleton instance of this type exposed through this static property. public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic { private static readonly Lazy s_instance = new Lazy(() => new ThreadPoolScheduler()); private static readonly Lazy s_newBackgroundThread = new Lazy(() => new NewThreadScheduler(action => new Thread(action) { IsBackground = true })); /// /// Gets the singleton instance of the CLR thread pool scheduler. /// public static ThreadPoolScheduler Instance => s_instance.Value; private ThreadPoolScheduler() { } /// /// Schedules an action to be executed. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } var workItem = new UserWorkItem(this, state, action); ThreadPool.QueueUserWorkItem( closureWorkItem => ((UserWorkItem)closureWorkItem).Run(), workItem); return workItem; } /// /// Schedules an action to be executed after dueTime, using a System.Threading.Timer object. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } var dt = Scheduler.Normalize(dueTime); if (dt.Ticks == 0) { return Schedule(state, action); } var workItem = new UserWorkItem(this, state, action); workItem.CancelQueueDisposable = new Timer( closureWorkItem => ((UserWorkItem)closureWorkItem).Run(), workItem, dt, Timeout.InfiniteTimeSpan); return workItem; } /// /// Schedules a long-running task by creating a new thread. Cancellation happens through polling. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public IDisposable ScheduleLongRunning(TState state, Action action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } return s_newBackgroundThread.Value.ScheduleLongRunning(state, action); } /// /// Starts a new stopwatch object. /// /// New stopwatch object; started at the time of the request. public override IStopwatch StartStopwatch() { // // Strictly speaking, this explicit override is not necessary because the base implementation calls into // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. // return new StopwatchImpl(); } /// /// Schedules a periodic piece of work, using a System.Threading.Timer object. /// /// The type of the state passed to the scheduled action. /// Initial state passed to the action upon the first iteration. /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). /// is null. /// is less than zero. public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) { throw new ArgumentOutOfRangeException(nameof(period)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } if (period == TimeSpan.Zero) { return new FastPeriodicTimer(state, action); } else { return new PeriodicTimer(state, period, action); } } private sealed class FastPeriodicTimer : IDisposable { private TState _state; private Func _action; private volatile bool _disposed; public FastPeriodicTimer(TState state, Func action) { _state = state; _action = action; ThreadPool.QueueUserWorkItem(_ => Tick(_), this); // Replace with method group as soon as Roslyn will cache the delegate then. } private static void Tick(object state) { var timer = (FastPeriodicTimer)state; if (!timer._disposed) { timer._state = timer._action(timer._state); ThreadPool.QueueUserWorkItem(_ => Tick(_), timer); } } public void Dispose() { _disposed = true; _action = Stubs.I; } } private sealed class PeriodicTimer : IDisposable { private TState _state; private Func _action; private readonly AsyncLock _gate; private volatile Timer _timer; public PeriodicTimer(TState state, TimeSpan period, Func action) { _state = state; _action = action; _gate = new AsyncLock(); // // Rooting of the timer happens through the this.Tick delegate's target object, // which is the current instance and has a field to store the Timer instance. // _timer = new Timer(@this => ((PeriodicTimer)@this).Tick(), this, period, period); } private void Tick() { _gate.Wait( this, @this => { @this._state = @this._action(@this._state); }); } public void Dispose() { var timer = _timer; if (timer != null) { _action = Stubs.I; _timer = null; timer.Dispose(); _gate.Dispose(); } } } } } #endif