1
0

ImmediateScheduler.cs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Threading;
  3. using System.Reactive.Disposables;
  4. namespace System.Reactive.Concurrency
  5. {
  6. /// <summary>
  7. /// Represents an object that schedules units of work to run immediately on the current thread.
  8. /// </summary>
  9. /// <seealso cref="Scheduler.Immediate">Singleton instance of this type exposed through this static property.</seealso>
  10. public sealed class ImmediateScheduler : LocalScheduler
  11. {
  12. private static readonly Lazy<ImmediateScheduler> s_instance = new Lazy<ImmediateScheduler>(() => new ImmediateScheduler());
  13. ImmediateScheduler()
  14. {
  15. }
  16. /// <summary>
  17. /// Gets the singleton instance of the immediate scheduler.
  18. /// </summary>
  19. public static ImmediateScheduler Instance
  20. {
  21. get { return s_instance.Value; }
  22. }
  23. /// <summary>
  24. /// Schedules an action to be executed.
  25. /// </summary>
  26. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  27. /// <param name="state">State passed to the action to be executed.</param>
  28. /// <param name="action">Action to be executed.</param>
  29. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  30. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  31. public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  32. {
  33. if (action == null)
  34. throw new ArgumentNullException("action");
  35. return action(new AsyncLockScheduler(), state);
  36. }
  37. /// <summary>
  38. /// Schedules an action to be executed after dueTime.
  39. /// </summary>
  40. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  41. /// <param name="state">State passed to the action to be executed.</param>
  42. /// <param name="action">Action to be executed.</param>
  43. /// <param name="dueTime">Relative time after which to execute the action.</param>
  44. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  45. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  46. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  47. {
  48. if (action == null)
  49. throw new ArgumentNullException("action");
  50. var dt = Scheduler.Normalize(dueTime);
  51. if (dt.Ticks > 0)
  52. {
  53. ConcurrencyAbstractionLayer.Current.Sleep(dt);
  54. }
  55. return action(new AsyncLockScheduler(), state);
  56. }
  57. class AsyncLockScheduler : LocalScheduler
  58. {
  59. AsyncLock asyncLock;
  60. public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  61. {
  62. if (action == null)
  63. throw new ArgumentNullException("action");
  64. var m = new SingleAssignmentDisposable();
  65. if (asyncLock == null)
  66. asyncLock = new AsyncLock();
  67. asyncLock.Wait(() =>
  68. {
  69. if (!m.IsDisposed)
  70. m.Disposable = action(this, state);
  71. });
  72. return m;
  73. }
  74. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  75. {
  76. if (action == null)
  77. throw new ArgumentNullException("action");
  78. if (dueTime.Ticks <= 0)
  79. return Schedule<TState>(state, action);
  80. var timer = ConcurrencyAbstractionLayer.Current.StartStopwatch();
  81. var m = new SingleAssignmentDisposable();
  82. if (asyncLock == null)
  83. asyncLock = new AsyncLock();
  84. asyncLock.Wait(() =>
  85. {
  86. if (!m.IsDisposed)
  87. {
  88. var sleep = dueTime - timer.Elapsed;
  89. if (sleep.Ticks > 0)
  90. {
  91. ConcurrencyAbstractionLayer.Current.Sleep(sleep);
  92. }
  93. if (!m.IsDisposed)
  94. m.Disposable = action(this, state);
  95. }
  96. });
  97. return m;
  98. }
  99. }
  100. }
  101. }