1
0

ConcurrencyAbstractionLayerImpl.cs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_THREAD
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive.Concurrency
  8. {
  9. //
  10. // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms.
  11. // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator
  12. // behavior of Rx for PLIB when used on a more capable platform.
  13. //
  14. internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer
  15. {
  16. private sealed class WorkItem
  17. {
  18. public WorkItem(Action<object?> action, object? state)
  19. {
  20. Action = action;
  21. State = state;
  22. }
  23. public Action<object?> Action { get; }
  24. public object? State { get; }
  25. }
  26. public IDisposable StartTimer(Action<object?> action, object? state, TimeSpan dueTime) => new Timer(action, state, Normalize(dueTime));
  27. public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
  28. {
  29. if (period < TimeSpan.Zero)
  30. {
  31. throw new ArgumentOutOfRangeException(nameof(period));
  32. }
  33. //
  34. // The contract for periodic scheduling in Rx is that specifying TimeSpan.Zero as the period causes the scheduler to
  35. // call back periodically as fast as possible, sequentially.
  36. //
  37. if (period == TimeSpan.Zero)
  38. {
  39. return new FastPeriodicTimer(action);
  40. }
  41. return new PeriodicTimer(action, period);
  42. }
  43. public IDisposable QueueUserWorkItem(Action<object?> action, object? state)
  44. {
  45. ThreadPool.QueueUserWorkItem(itemObject =>
  46. {
  47. var item = (WorkItem)itemObject!;
  48. item.Action(item.State);
  49. }, new WorkItem(action, state));
  50. return Disposable.Empty;
  51. }
  52. public void Sleep(TimeSpan timeout) => Thread.Sleep(Normalize(timeout));
  53. public IStopwatch StartStopwatch() => new StopwatchImpl();
  54. public bool SupportsLongRunning => true;
  55. public void StartThread(Action<object?> action, object? state)
  56. {
  57. new Thread(itemObject =>
  58. {
  59. var item = (WorkItem)itemObject!;
  60. item.Action(item.State);
  61. })
  62. { IsBackground = true }.Start(new WorkItem(action, state));
  63. }
  64. private static TimeSpan Normalize(TimeSpan dueTime) => dueTime < TimeSpan.Zero ? TimeSpan.Zero : dueTime;
  65. //
  66. // Some historical context. In the early days of Rx, we discovered an issue with
  67. // the rooting of timers, causing them to get GC'ed even when the IDisposable of
  68. // a scheduled activity was kept alive. The original code simply created a timer
  69. // as follows:
  70. //
  71. // var t = default(Timer);
  72. // t = new Timer(_ =>
  73. // {
  74. // t = null;
  75. // Debug.WriteLine("Hello!");
  76. // }, null, 5000, Timeout.Infinite);
  77. //
  78. // IIRC the reference to "t" captured by the closure wasn't sufficient on .NET CF
  79. // to keep the timer rooted, causing problems on Windows Phone 7. As a result, we
  80. // added rooting code using a dictionary (SD 7280), which we carried forward all
  81. // the way to Rx v2.0 RTM.
  82. //
  83. // However, the desktop CLR's implementation of System.Threading.Timer exhibits
  84. // other characteristics where a timer can root itself when the timer is still
  85. // reachable through the state or callback parameters. To illustrate this, run
  86. // the following piece of code:
  87. //
  88. // static void Main()
  89. // {
  90. // Bar();
  91. //
  92. // while (true)
  93. // {
  94. // GC.Collect();
  95. // GC.WaitForPendingFinalizers();
  96. // Thread.Sleep(100);
  97. // }
  98. // }
  99. //
  100. // static void Bar()
  101. // {
  102. // var t = default(Timer);
  103. // t = new Timer(_ =>
  104. // {
  105. // t = null; // Comment out this line to see the timer stop
  106. // Console.WriteLine("Hello!");
  107. // }, null, 5000, Timeout.Infinite);
  108. // }
  109. //
  110. // When the closure over "t" is removed, the timer will stop automatically upon
  111. // garbage collection. However, when retaining the reference, this problem does
  112. // not exist. The code below exploits this behavior, avoiding unnecessary costs
  113. // to root timers in a thread-safe manner.
  114. //
  115. // Below is a fragment of SOS output, proving the proper rooting:
  116. //
  117. // !gcroot 02492440
  118. // HandleTable:
  119. // 005a13fc (pinned handle)
  120. // -> 03491010 System.Object[]
  121. // -> 024924dc System.Threading.TimerQueue
  122. // -> 02492450 System.Threading.TimerQueueTimer
  123. // -> 02492420 System.Threading.TimerCallback
  124. // -> 02492414 TimerRootingExperiment.Program+<>c__DisplayClass1
  125. // -> 02492440 System.Threading.Timer
  126. //
  127. // With the USE_TIMER_SELF_ROOT symbol, we shake off this additional rooting code
  128. // for newer platforms where this no longer needed. We checked this on .NET Core
  129. // as well as .NET 4.0, and only #define this symbol for those platforms.
  130. //
  131. // NB: 4/13/2017 - All target platforms for the 4.x release have the self-rooting
  132. // behavior described here, so we removed the USE_TIMER_SELF_ROOT
  133. // symbol.
  134. //
  135. private sealed class Timer : IDisposable
  136. {
  137. private volatile object? _state;
  138. private Action<object?> _action;
  139. private IDisposable _timer;
  140. private static readonly object DisposedState = new object();
  141. public Timer(Action<object?> action, object? state, TimeSpan dueTime)
  142. {
  143. _state = state;
  144. _action = action;
  145. Disposable.SetSingle(ref _timer, new System.Threading.Timer(static @this => Tick(@this!), this, dueTime, TimeSpan.FromMilliseconds(Timeout.Infinite)));
  146. }
  147. private static void Tick(object state)
  148. {
  149. var timer = (Timer)state;
  150. try
  151. {
  152. var timerState = timer._state;
  153. if (timerState != DisposedState)
  154. {
  155. timer._action(timerState);
  156. }
  157. }
  158. finally
  159. {
  160. Disposable.Dispose(ref timer._timer);
  161. }
  162. }
  163. public void Dispose()
  164. {
  165. if (Disposable.TryDispose(ref _timer))
  166. {
  167. _action = Stubs<object?>.Ignore;
  168. _state = DisposedState;
  169. }
  170. }
  171. }
  172. private sealed class PeriodicTimer : IDisposable
  173. {
  174. private Action _action;
  175. private volatile System.Threading.Timer? _timer;
  176. public PeriodicTimer(Action action, TimeSpan period)
  177. {
  178. _action = action;
  179. //
  180. // Rooting of the timer happens through the timer's state
  181. // which is the current instance and has a field to store the Timer instance.
  182. //
  183. _timer = new System.Threading.Timer(static @this => Tick(@this!), this, period, period);
  184. }
  185. private static void Tick(object state)
  186. {
  187. var timer = (PeriodicTimer)state;
  188. timer._action();
  189. }
  190. public void Dispose()
  191. {
  192. var timer = _timer;
  193. if (timer != null)
  194. {
  195. _action = Stubs.Nop;
  196. _timer = null;
  197. timer.Dispose();
  198. }
  199. }
  200. }
  201. private sealed class FastPeriodicTimer : IDisposable
  202. {
  203. private readonly Action _action;
  204. private volatile bool _disposed;
  205. public FastPeriodicTimer(Action action)
  206. {
  207. _action = action;
  208. new Thread(static @this => Loop(@this!))
  209. {
  210. Name = "Rx-FastPeriodicTimer",
  211. IsBackground = true
  212. }
  213. .Start(this);
  214. }
  215. private static void Loop(object threadParam)
  216. {
  217. var timer = (FastPeriodicTimer)threadParam;
  218. while (!timer._disposed)
  219. {
  220. timer._action();
  221. }
  222. }
  223. public void Dispose()
  224. {
  225. _disposed = true;
  226. }
  227. }
  228. }
  229. }
  230. #endif