// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
///
/// Represents an object that schedules each unit of work on a separate thread.
///
public sealed class NewThreadScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
{
internal static readonly Lazy s_instance = new Lazy(() => new NewThreadScheduler());
private readonly Func _threadFactory;
///
/// Creates an object that schedules each unit of work on a separate thread.
///
public NewThreadScheduler()
: this(action => new Thread(action))
{
}
///
/// Gets an instance of this scheduler that uses the default Thread constructor.
///
public static NewThreadScheduler Default
{
get
{
return s_instance.Value;
}
}
#if !NO_THREAD
///
/// Creates an object that schedules each unit of work on a separate thread.
///
/// Factory function for thread creation.
/// is null.
public NewThreadScheduler(Func threadFactory)
{
if (threadFactory == null)
throw new ArgumentNullException("threadFactory");
#else
private NewThreadScheduler(Func threadFactory)
{
#endif
_threadFactory = threadFactory;
}
///
/// Schedules an action to be executed after dueTime.
///
/// The type of the state passed to the scheduled action.
/// State passed to the action to be executed.
/// Action to be executed.
/// Relative time after which to execute the action.
/// The disposable object used to cancel the scheduled action (best effort).
/// is null.
public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action)
{
if (action == null)
throw new ArgumentNullException("action");
var scheduler = new EventLoopScheduler(_threadFactory);
scheduler.ExitIfEmpty = true;
return scheduler.Schedule(state, dueTime, action);
}
///
/// Schedules a long-running task by creating a new thread. Cancellation happens through polling.
///
/// The type of the state passed to the scheduled action.
/// State passed to the action to be executed.
/// Action to be executed.
/// The disposable object used to cancel the scheduled action (best effort).
/// is null.
public IDisposable ScheduleLongRunning(TState state, Action action)
{
if (action == null)
throw new ArgumentNullException("action");
var d = new BooleanDisposable();
var thread = _threadFactory(() =>
{
//
// Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning
// requires us to ensure the scheduled work gets an opportunity to observe
// the cancellation request.
//
action(state, d);
});
thread.Start();
return d;
}
///
/// Schedules a periodic piece of work by creating a new thread that goes to sleep when work has been dispatched and wakes up again at the next periodic due time.
///
/// The type of the state passed to the scheduled action.
/// Initial state passed to the action upon the first iteration.
/// Period for running the work periodically.
/// Action to be executed, potentially updating the state.
/// The disposable object used to cancel the scheduled recurring action (best effort).
/// is null.
/// is less than TimeSpan.Zero.
public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
var periodic = new Periodic(state, period, action);
var thread = _threadFactory(periodic.Run);
thread.Start();
return periodic;
}
class Periodic : IDisposable
{
private readonly IStopwatch _stopwatch;
private readonly TimeSpan _period;
private readonly Func _action;
private readonly object _cancel = new object();
private volatile bool _done;
private TState _state;
private TimeSpan _next;
public Periodic(TState state, TimeSpan period, Func action)
{
_stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
_period = period;
_action = action;
_state = state;
_next = period;
}
public void Run()
{
while (!_done)
{
var timeout = Scheduler.Normalize(_next - _stopwatch.Elapsed);
lock (_cancel)
{
if (Monitor.Wait(_cancel, timeout))
return;
}
_state = _action(_state);
_next += _period;
}
}
public void Dispose()
{
_done = true;
lock (_cancel)
{
Monitor.Pulse(_cancel);
}
}
}
#if !NO_STOPWATCH
///
/// Starts a new stopwatch object.
///
/// New stopwatch object; started at the time of the request.
public override IStopwatch StartStopwatch()
{
//
// Strictly speaking, this explicit override is not necessary because the base implementation calls into
// the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
// where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
//
return new StopwatchImpl();
}
#endif
}
}