// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !WINDOWS && !NO_THREAD using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { /// /// Represents an object that schedules units of work on the CLR thread pool. /// /// Singleton instance of this type exposed through this static property. public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic { private static readonly ThreadPoolScheduler s_instance = new ThreadPoolScheduler(); private static readonly NewThreadScheduler s_newBackgroundThread = new NewThreadScheduler(action => new Thread(action) { IsBackground = true }); /// /// Gets the singleton instance of the CLR thread pool scheduler. /// public static ThreadPoolScheduler Instance { get { return s_instance; } } ThreadPoolScheduler() { } /// /// Schedules an action to be executed. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) throw new ArgumentNullException("action"); var d = new SingleAssignmentDisposable(); ThreadPool.QueueUserWorkItem(_ => { if (!d.IsDisposed) d.Disposable = action(this, state); }, null); return d; } /// /// Schedules an action to be executed after dueTime, using a System.Threading.Timer object. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) throw new ArgumentNullException("action"); var dt = Scheduler.Normalize(dueTime); if (dt.Ticks == 0) return Schedule(state, action); return new Timer(this, state, dt, action); } /// /// Schedules a long-running task by creating a new thread. Cancellation happens through polling. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public IDisposable ScheduleLongRunning(TState state, Action action) { if (action == null) throw new ArgumentNullException("action"); return s_newBackgroundThread.ScheduleLongRunning(state, action); } #if !NO_STOPWATCH /// /// Starts a new stopwatch object. /// /// New stopwatch object; started at the time of the request. public override IStopwatch StartStopwatch() { // // Strictly speaking, this explicit override is not necessary because the base implementation calls into // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. // return new StopwatchImpl(); } #endif /// /// Schedules a periodic piece of work, using a System.Threading.Timer object. /// /// The type of the state passed to the scheduled action. /// Initial state passed to the action upon the first iteration. /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). /// is null. /// is less than zero. public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); if (action == null) throw new ArgumentNullException("action"); if (period == TimeSpan.Zero) { return new FastPeriodicTimer(state, action); } else { return new PeriodicTimer(state, period, action); } } sealed class FastPeriodicTimer : IDisposable { private TState _state; private Func _action; private volatile bool _disposed; public FastPeriodicTimer(TState state, Func action) { _state = state; _action = action; ThreadPool.QueueUserWorkItem(Tick, null); } private void Tick(object state) { if (!_disposed) { _state = _action(_state); ThreadPool.QueueUserWorkItem(Tick, null); } } public void Dispose() { _disposed = true; _action = Stubs.I; } } #if USE_TIMER_SELF_ROOT // // See ConcurrencyAbstractionLayerImpl.cs for more information about the code // below and its timer rooting behavior. // sealed class Timer : IDisposable { private readonly MultipleAssignmentDisposable _disposable; private readonly IScheduler _parent; private readonly TState _state; private Func _action; private volatile System.Threading.Timer _timer; public Timer(IScheduler parent, TState state, TimeSpan dueTime, Func action) { _parent = parent; _state = state; _action = action; _disposable = new MultipleAssignmentDisposable(); _disposable.Disposable = Disposable.Create(Stop); // Don't want the spin wait in Tick to get stuck if this thread gets aborted. try { } finally { // // Rooting of the timer happens through the this.Tick delegate's target object, // which is the current instance and has a field to store the Timer instance. // _timer = new System.Threading.Timer(this.Tick, null, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); } } private void Tick(object state) { try { _disposable.Disposable = _action(_parent, _state); } finally { SpinWait.SpinUntil(IsTimerAssigned); Stop(); } } private bool IsTimerAssigned() { return _timer != null; } public void Dispose() { _disposable.Dispose(); } private void Stop() { var timer = _timer; if (timer != TimerStubs.Never) { _action = Nop; _timer = TimerStubs.Never; timer.Dispose(); } } private IDisposable Nop(IScheduler scheduler, TState state) { return Disposable.Empty; } } sealed class PeriodicTimer : IDisposable { private TState _state; private Func _action; private readonly AsyncLock _gate; private volatile System.Threading.Timer _timer; public PeriodicTimer(TState state, TimeSpan period, Func action) { _state = state; _action = action; _gate = new AsyncLock(); // // Rooting of the timer happens through the this.Tick delegate's target object, // which is the current instance and has a field to store the Timer instance. // _timer = new System.Threading.Timer(this.Tick, null, period, period); } private void Tick(object state) { _gate.Wait(() => { _state = _action(_state); }); } public void Dispose() { var timer = _timer; if (timer != null) { _action = Stubs.I; _timer = null; timer.Dispose(); _gate.Dispose(); } } } #else abstract class Timer { // // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. // #if !NO_HASHSET protected static readonly HashSet s_timers = new HashSet(); #else protected static readonly Dictionary s_timers = new Dictionary(); #endif } sealed class Timer : Timer, IDisposable { private readonly MultipleAssignmentDisposable _disposable; private readonly IScheduler _parent; private readonly TState _state; private Func _action; private System.Threading.Timer _timer; private bool _hasAdded; private bool _hasRemoved; public Timer(IScheduler parent, TState state, TimeSpan dueTime, Func action) { _disposable = new MultipleAssignmentDisposable(); _disposable.Disposable = Disposable.Create(Unroot); _parent = parent; _state = state; _action = action; _timer = new System.Threading.Timer(Tick, null, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); lock (s_timers) { if (!_hasRemoved) { #if !NO_HASHSET s_timers.Add(_timer); #else s_timers.Add(_timer, null); #endif _hasAdded = true; } } } private void Tick(object state) { try { _disposable.Disposable = _action(_parent, _state); } finally { Unroot(); } } private void Unroot() { _action = Nop; var timer = default(System.Threading.Timer); lock (s_timers) { if (!_hasRemoved) { timer = _timer; _timer = null; if (_hasAdded && timer != null) s_timers.Remove(timer); _hasRemoved = true; } } if (timer != null) timer.Dispose(); } private IDisposable Nop(IScheduler scheduler, TState state) { return Disposable.Empty; } public void Dispose() { _disposable.Dispose(); } } abstract class PeriodicTimer { // // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. // #if !NO_HASHSET protected static readonly HashSet s_timers = new HashSet(); #else protected static readonly Dictionary s_timers = new Dictionary(); #endif } sealed class PeriodicTimer : PeriodicTimer, IDisposable { private readonly AsyncLock _gate; private TState _state; private Func _action; private System.Threading.Timer _timer; public PeriodicTimer(TState state, TimeSpan period, Func action) { _gate = new AsyncLock(); _state = state; _action = action; _timer = new System.Threading.Timer(Tick, null, period, period); lock (s_timers) { #if !NO_HASHSET s_timers.Add(_timer); #else s_timers.Add(_timer, null); #endif } } private void Tick(object state) { _gate.Wait(() => { _state = _action(_state); }); } public void Dispose() { var timer = default(System.Threading.Timer); lock (s_timers) { timer = _timer; _timer = null; if (timer != null) s_timers.Remove(timer); } if (timer != null) { timer.Dispose(); _gate.Dispose(); _action = Stubs.I; } } } #endif } } #endif