1
0

CatchScheduler.cs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Reactive.Disposables;
  6. #if !NO_WEAKTABLE
  7. using System.Runtime.CompilerServices;
  8. #endif
  9. namespace System.Reactive.Concurrency
  10. {
  11. class CatchScheduler<TException> : SchedulerWrapper
  12. where TException : Exception
  13. {
  14. private readonly Func<TException, bool> _handler;
  15. public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler)
  16. : base(scheduler)
  17. {
  18. _handler = handler;
  19. }
  20. protected override Func<IScheduler, TState, IDisposable> Wrap<TState>(Func<IScheduler, TState, IDisposable> action)
  21. {
  22. return (self, state) =>
  23. {
  24. try
  25. {
  26. return action(GetRecursiveWrapper(self), state);
  27. }
  28. catch (TException exception)
  29. {
  30. if (!_handler(exception))
  31. throw;
  32. return Disposable.Empty;
  33. }
  34. };
  35. }
  36. #if !NO_WEAKTABLE
  37. public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler, ConditionalWeakTable<IScheduler, IScheduler> cache)
  38. : base(scheduler, cache)
  39. {
  40. _handler = handler;
  41. }
  42. protected override SchedulerWrapper Clone(IScheduler scheduler, ConditionalWeakTable<IScheduler, IScheduler> cache)
  43. {
  44. return new CatchScheduler<TException>(scheduler, _handler, cache);
  45. }
  46. #else
  47. protected override SchedulerWrapper Clone(IScheduler scheduler)
  48. {
  49. return new CatchScheduler<TException>(scheduler, _handler);
  50. }
  51. #endif
  52. protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object service)
  53. {
  54. service = provider.GetService(serviceType);
  55. if (service != null)
  56. {
  57. if (serviceType == typeof(ISchedulerLongRunning))
  58. service = new CatchSchedulerLongRunning((ISchedulerLongRunning)service, _handler);
  59. else if (serviceType == typeof(ISchedulerPeriodic))
  60. service = new CatchSchedulerPeriodic((ISchedulerPeriodic)service, _handler);
  61. }
  62. return true;
  63. }
  64. class CatchSchedulerLongRunning : ISchedulerLongRunning
  65. {
  66. private readonly ISchedulerLongRunning _scheduler;
  67. private readonly Func<TException, bool> _handler;
  68. public CatchSchedulerLongRunning(ISchedulerLongRunning scheduler, Func<TException, bool> handler)
  69. {
  70. _scheduler = scheduler;
  71. _handler = handler;
  72. }
  73. public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
  74. {
  75. return _scheduler.ScheduleLongRunning(state, (state_, cancel) =>
  76. {
  77. try
  78. {
  79. action(state_, cancel);
  80. }
  81. catch (TException exception)
  82. {
  83. if (!_handler(exception))
  84. throw;
  85. }
  86. });
  87. }
  88. }
  89. class CatchSchedulerPeriodic : ISchedulerPeriodic
  90. {
  91. private readonly ISchedulerPeriodic _scheduler;
  92. private readonly Func<TException, bool> _handler;
  93. public CatchSchedulerPeriodic(ISchedulerPeriodic scheduler, Func<TException, bool> handler)
  94. {
  95. _scheduler = scheduler;
  96. _handler = handler;
  97. }
  98. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  99. {
  100. var failed = false;
  101. var d = new SingleAssignmentDisposable();
  102. d.Disposable = _scheduler.SchedulePeriodic(state, period, state_ =>
  103. {
  104. //
  105. // Cancellation may not be granted immediately; prevent from running user
  106. // code in that case. Periodic schedulers are assumed to introduce some
  107. // degree of concurrency, so we should return from the SchedulePeriodic
  108. // call eventually, allowing the d.Dispose() call in the catch block to
  109. // take effect.
  110. //
  111. if (failed)
  112. return default(TState);
  113. try
  114. {
  115. return action(state_);
  116. }
  117. catch (TException exception)
  118. {
  119. failed = true;
  120. if (!_handler(exception))
  121. throw;
  122. d.Dispose();
  123. return default(TState);
  124. }
  125. });
  126. return d;
  127. }
  128. }
  129. }
  130. }