CatchScheduler.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Reactive.Disposables;
  4. #if !NO_WEAKTABLE
  5. using System.Runtime.CompilerServices;
  6. #endif
  7. namespace System.Reactive.Concurrency
  8. {
  9. class CatchScheduler<TException> : SchedulerWrapper
  10. where TException : Exception
  11. {
  12. private readonly Func<TException, bool> _handler;
  13. public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler)
  14. : base(scheduler)
  15. {
  16. _handler = handler;
  17. }
  18. protected override Func<IScheduler, TState, IDisposable> Wrap<TState>(Func<IScheduler, TState, IDisposable> action)
  19. {
  20. return (self, state) =>
  21. {
  22. try
  23. {
  24. return action(GetRecursiveWrapper(self), state);
  25. }
  26. catch (TException exception)
  27. {
  28. if (!_handler(exception))
  29. throw;
  30. return Disposable.Empty;
  31. }
  32. };
  33. }
  34. #if !NO_WEAKTABLE
  35. public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler, ConditionalWeakTable<IScheduler, IScheduler> cache)
  36. : base(scheduler, cache)
  37. {
  38. _handler = handler;
  39. }
  40. protected override SchedulerWrapper Clone(IScheduler scheduler, ConditionalWeakTable<IScheduler, IScheduler> cache)
  41. {
  42. return new CatchScheduler<TException>(scheduler, _handler, cache);
  43. }
  44. #else
  45. protected override SchedulerWrapper Clone(IScheduler scheduler)
  46. {
  47. return new CatchScheduler<TException>(scheduler, _handler);
  48. }
  49. #endif
  50. protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object service)
  51. {
  52. service = provider.GetService(serviceType);
  53. if (service != null)
  54. {
  55. if (serviceType == typeof(ISchedulerLongRunning))
  56. service = new CatchSchedulerLongRunning((ISchedulerLongRunning)service, _handler);
  57. else if (serviceType == typeof(ISchedulerPeriodic))
  58. service = new CatchSchedulerPeriodic((ISchedulerPeriodic)service, _handler);
  59. }
  60. return true;
  61. }
  62. class CatchSchedulerLongRunning : ISchedulerLongRunning
  63. {
  64. private readonly ISchedulerLongRunning _scheduler;
  65. private readonly Func<TException, bool> _handler;
  66. public CatchSchedulerLongRunning(ISchedulerLongRunning scheduler, Func<TException, bool> handler)
  67. {
  68. _scheduler = scheduler;
  69. _handler = handler;
  70. }
  71. public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
  72. {
  73. return _scheduler.ScheduleLongRunning(state, (state_, cancel) =>
  74. {
  75. try
  76. {
  77. action(state_, cancel);
  78. }
  79. catch (TException exception)
  80. {
  81. if (!_handler(exception))
  82. throw;
  83. }
  84. });
  85. }
  86. }
  87. class CatchSchedulerPeriodic : ISchedulerPeriodic
  88. {
  89. private readonly ISchedulerPeriodic _scheduler;
  90. private readonly Func<TException, bool> _handler;
  91. public CatchSchedulerPeriodic(ISchedulerPeriodic scheduler, Func<TException, bool> handler)
  92. {
  93. _scheduler = scheduler;
  94. _handler = handler;
  95. }
  96. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  97. {
  98. var failed = false;
  99. var d = new SingleAssignmentDisposable();
  100. d.Disposable = _scheduler.SchedulePeriodic(state, period, state_ =>
  101. {
  102. //
  103. // Cancellation may not be granted immediately; prevent from running user
  104. // code in that case. Periodic schedulers are assumed to introduce some
  105. // degree of concurrency, so we should return from the SchedulePeriodic
  106. // call eventually, allowing the d.Dispose() call in the catch block to
  107. // take effect.
  108. //
  109. if (failed)
  110. return default(TState);
  111. try
  112. {
  113. return action(state_);
  114. }
  115. catch (TException exception)
  116. {
  117. failed = true;
  118. if (!_handler(exception))
  119. throw;
  120. d.Dispose();
  121. return default(TState);
  122. }
  123. });
  124. return d;
  125. }
  126. }
  127. }
  128. }