CatchScheduler.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #nullable disable
  5. using System.Reactive.Disposables;
  6. using System.Runtime.CompilerServices;
  7. namespace System.Reactive.Concurrency
  8. {
  9. internal sealed class CatchScheduler<TException> : SchedulerWrapper
  10. where TException : Exception
  11. {
  12. private readonly Func<TException, bool> _handler;
  13. public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler)
  14. : base(scheduler)
  15. {
  16. _handler = handler;
  17. }
  18. protected override Func<IScheduler, TState, IDisposable> Wrap<TState>(Func<IScheduler, TState, IDisposable> action)
  19. {
  20. return (self, state) =>
  21. {
  22. try
  23. {
  24. return action(GetRecursiveWrapper(self), state);
  25. }
  26. catch (TException exception) when (_handler(exception))
  27. {
  28. return Disposable.Empty;
  29. }
  30. };
  31. }
  32. public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler, ConditionalWeakTable<IScheduler, IScheduler> cache)
  33. : base(scheduler, cache)
  34. {
  35. _handler = handler;
  36. }
  37. protected override SchedulerWrapper Clone(IScheduler scheduler, ConditionalWeakTable<IScheduler, IScheduler> cache)
  38. {
  39. return new CatchScheduler<TException>(scheduler, _handler, cache);
  40. }
  41. protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object service)
  42. {
  43. service = provider.GetService(serviceType);
  44. if (service != null)
  45. {
  46. if (serviceType == typeof(ISchedulerLongRunning))
  47. {
  48. service = new CatchSchedulerLongRunning((ISchedulerLongRunning)service, _handler);
  49. }
  50. else if (serviceType == typeof(ISchedulerPeriodic))
  51. {
  52. service = new CatchSchedulerPeriodic((ISchedulerPeriodic)service, _handler);
  53. }
  54. }
  55. return true;
  56. }
  57. private class CatchSchedulerLongRunning : ISchedulerLongRunning
  58. {
  59. private readonly ISchedulerLongRunning _scheduler;
  60. private readonly Func<TException, bool> _handler;
  61. public CatchSchedulerLongRunning(ISchedulerLongRunning scheduler, Func<TException, bool> handler)
  62. {
  63. _scheduler = scheduler;
  64. _handler = handler;
  65. }
  66. public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
  67. {
  68. // Note that avoiding closure allocation here would introduce infinite generic recursion over the TState argument
  69. return _scheduler.ScheduleLongRunning(
  70. state,
  71. (state1, cancel) =>
  72. {
  73. try
  74. {
  75. action(state1, cancel);
  76. }
  77. catch (TException exception) when (_handler(exception))
  78. {
  79. }
  80. });
  81. }
  82. }
  83. private sealed class CatchSchedulerPeriodic : ISchedulerPeriodic
  84. {
  85. private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
  86. {
  87. private IDisposable _cancel;
  88. private bool _failed;
  89. private readonly Func<TState, TState> _action;
  90. private readonly CatchSchedulerPeriodic _catchScheduler;
  91. public PeriodicallyScheduledWorkItem(CatchSchedulerPeriodic scheduler, TState state, TimeSpan period, Func<TState, TState> action)
  92. {
  93. _catchScheduler = scheduler;
  94. _action = action;
  95. // Note that avoiding closure allocation here would introduce infinite generic recursion over the TState argument
  96. Disposable.SetSingle(ref _cancel, scheduler._scheduler.SchedulePeriodic(state, period, state1 => this.Tick(state1).state ?? default));
  97. }
  98. public void Dispose()
  99. {
  100. Disposable.TryDispose(ref _cancel);
  101. }
  102. private (PeriodicallyScheduledWorkItem<TState> @this, TState state) Tick(TState state)
  103. {
  104. //
  105. // Cancellation may not be granted immediately; prevent from running user
  106. // code in that case. Periodic schedulers are assumed to introduce some
  107. // degree of concurrency, so we should return from the SchedulePeriodic
  108. // call eventually, allowing the d.Dispose() call in the catch block to
  109. // take effect.
  110. //
  111. if (_failed)
  112. {
  113. return default;
  114. }
  115. try
  116. {
  117. return (this, _action(state));
  118. }
  119. catch (TException exception)
  120. {
  121. _failed = true;
  122. if (!_catchScheduler._handler(exception))
  123. {
  124. throw;
  125. }
  126. Disposable.TryDispose(ref _cancel);
  127. return default;
  128. }
  129. }
  130. }
  131. private readonly ISchedulerPeriodic _scheduler;
  132. private readonly Func<TException, bool> _handler;
  133. public CatchSchedulerPeriodic(ISchedulerPeriodic scheduler, Func<TException, bool> handler)
  134. {
  135. _scheduler = scheduler;
  136. _handler = handler;
  137. }
  138. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  139. {
  140. return new PeriodicallyScheduledWorkItem<TState>(this, state, period, action);
  141. }
  142. }
  143. }
  144. }