// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { /// /// Represents an object that schedules units of work on the platform's default scheduler. /// /// Singleton instance of this type exposed through this static property. public sealed class DefaultScheduler : LocalScheduler, ISchedulerPeriodic { private static readonly Lazy s_instance = new Lazy(() => new DefaultScheduler()); private static IConcurrencyAbstractionLayer s_cal = ConcurrencyAbstractionLayer.Current; /// /// Gets the singleton instance of the default scheduler. /// public static DefaultScheduler Instance { get { return s_instance.Value; } } private DefaultScheduler() { } /// /// Schedules an action to be executed. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) throw new ArgumentNullException("action"); var d = new SingleAssignmentDisposable(); var cancel = s_cal.QueueUserWorkItem(_ => { if (!d.IsDisposed) d.Disposable = action(this, state); }, null); return StableCompositeDisposable.Create( d, cancel ); } /// /// Schedules an action to be executed after dueTime, using a System.Threading.Timer object. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) throw new ArgumentNullException("action"); var dt = Scheduler.Normalize(dueTime); if (dt.Ticks == 0) return Schedule(state, action); var d = new SingleAssignmentDisposable(); var cancel = s_cal.StartTimer(_ => { if (!d.IsDisposed) d.Disposable = action(this, state); }, null, dt); return StableCompositeDisposable.Create( d, cancel ); } /// /// Schedules a periodic piece of work, using a System.Threading.Timer object. /// /// The type of the state passed to the scheduled action. /// Initial state passed to the action upon the first iteration. /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). /// is less than TimeSpan.Zero. /// is null. public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); if (action == null) throw new ArgumentNullException("action"); var state1 = state; var gate = new AsyncLock(); var cancel = s_cal.StartPeriodicTimer(() => { gate.Wait(() => { state1 = action(state1); }); }, period); return Disposable.Create(() => { cancel.Dispose(); gate.Dispose(); action = Stubs.I; }); } /// /// Discovers scheduler services by interface type. /// /// Scheduler service interface type to discover. /// Object implementing the requested service, if available; null otherwise. protected override object GetService(Type serviceType) { if (serviceType == typeof(ISchedulerLongRunning)) { if (s_cal.SupportsLongRunning) { return LongRunning.Instance; } } return base.GetService(serviceType); } class LongRunning : ISchedulerLongRunning { public static ISchedulerLongRunning Instance = new LongRunning(); public IDisposable ScheduleLongRunning(TState state, Action action) { if (action == null) throw new ArgumentNullException("action"); var cancel = new BooleanDisposable(); DefaultScheduler.s_cal.StartThread( arg => { var d = (ICancelable)arg; // // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning // requires us to ensure the scheduled work gets an opportunity to observe // the cancellation request. // action(state, d); }, cancel ); return cancel; } } } }