// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { /// /// Represents an object that schedules each unit of work on a separate thread. /// public sealed class NewThreadScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic { internal static readonly Lazy s_instance = new Lazy(() => new NewThreadScheduler()); private readonly Func _threadFactory; /// /// Creates an object that schedules each unit of work on a separate thread. /// public NewThreadScheduler() : this(action => new Thread(action)) { } /// /// Gets an instance of this scheduler that uses the default Thread constructor. /// public static NewThreadScheduler Default { get { return s_instance.Value; } } #if !NO_THREAD /// /// Creates an object that schedules each unit of work on a separate thread. /// /// Factory function for thread creation. /// is null. public NewThreadScheduler(Func threadFactory) { if (threadFactory == null) throw new ArgumentNullException("threadFactory"); #else private NewThreadScheduler(Func threadFactory) { #endif _threadFactory = threadFactory; } /// /// Schedules an action to be executed after dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) throw new ArgumentNullException("action"); var scheduler = new EventLoopScheduler(_threadFactory); scheduler.ExitIfEmpty = true; return scheduler.Schedule(state, dueTime, action); } /// /// Schedules a long-running task by creating a new thread. Cancellation happens through polling. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public IDisposable ScheduleLongRunning(TState state, Action action) { if (action == null) throw new ArgumentNullException("action"); var d = new BooleanDisposable(); var thread = _threadFactory(() => { // // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning // requires us to ensure the scheduled work gets an opportunity to observe // the cancellation request. // action(state, d); }); thread.Start(); return d; } /// /// Schedules a periodic piece of work by creating a new thread that goes to sleep when work has been dispatched and wakes up again at the next periodic due time. /// /// The type of the state passed to the scheduled action. /// Initial state passed to the action upon the first iteration. /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). /// is null. /// is less than TimeSpan.Zero. public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); if (action == null) throw new ArgumentNullException("action"); var periodic = new Periodic(state, period, action); var thread = _threadFactory(periodic.Run); thread.Start(); return periodic; } class Periodic : IDisposable { private readonly IStopwatch _stopwatch; private readonly TimeSpan _period; private readonly Func _action; private readonly object _cancel = new object(); private volatile bool _done; private TState _state; private TimeSpan _next; public Periodic(TState state, TimeSpan period, Func action) { _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); _period = period; _action = action; _state = state; _next = period; } public void Run() { while (!_done) { var timeout = Scheduler.Normalize(_next - _stopwatch.Elapsed); lock (_cancel) { if (Monitor.Wait(_cancel, timeout)) return; } _state = _action(_state); _next += _period; } } public void Dispose() { _done = true; lock (_cancel) { Monitor.Pulse(_cancel); } } } #if !NO_STOPWATCH /// /// Starts a new stopwatch object. /// /// New stopwatch object; started at the time of the request. public override IStopwatch StartStopwatch() { // // Strictly speaking, this explicit override is not necessary because the base implementation calls into // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. // return new StopwatchImpl(); } #endif } }