| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Reactive.Disposables;
- using System.Runtime.CompilerServices;
- namespace System.Reactive.Concurrency
- {
- internal sealed class CatchScheduler<TException> : SchedulerWrapper
- where TException : Exception
- {
- private readonly Func<TException, bool> _handler;
- public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler)
- : base(scheduler)
- {
- _handler = handler;
- }
- protected override Func<IScheduler, TState, IDisposable> Wrap<TState>(Func<IScheduler, TState, IDisposable> action)
- {
- return (self, state) =>
- {
- try
- {
- return action(GetRecursiveWrapper(self), state);
- }
- catch (TException exception) when (_handler(exception))
- {
- return Disposable.Empty;
- }
- };
- }
- public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler, ConditionalWeakTable<IScheduler, IScheduler> cache)
- : base(scheduler, cache)
- {
- _handler = handler;
- }
- protected override SchedulerWrapper Clone(IScheduler scheduler, ConditionalWeakTable<IScheduler, IScheduler> cache)
- {
- return new CatchScheduler<TException>(scheduler, _handler, cache);
- }
- protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object? service)
- {
- service = provider.GetService(serviceType);
- if (service != null)
- {
- if (serviceType == typeof(ISchedulerLongRunning))
- {
- service = new CatchSchedulerLongRunning((ISchedulerLongRunning)service, _handler);
- }
- else if (serviceType == typeof(ISchedulerPeriodic))
- {
- service = new CatchSchedulerPeriodic((ISchedulerPeriodic)service, _handler);
- }
- }
- return true;
- }
- private class CatchSchedulerLongRunning : ISchedulerLongRunning
- {
- private readonly ISchedulerLongRunning _scheduler;
- private readonly Func<TException, bool> _handler;
- public CatchSchedulerLongRunning(ISchedulerLongRunning scheduler, Func<TException, bool> handler)
- {
- _scheduler = scheduler;
- _handler = handler;
- }
- public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
- {
- // Note that avoiding closure allocation here would introduce infinite generic recursion over the TState argument
-
- return _scheduler.ScheduleLongRunning(
- state,
- (state1, cancel) =>
- {
- try
- {
- action(state1, cancel);
- }
- catch (TException exception) when (_handler(exception))
- {
- }
- });
- }
- }
- private sealed class CatchSchedulerPeriodic : ISchedulerPeriodic
- {
- private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
- {
- private IDisposable _cancel;
- private bool _failed;
- private readonly Func<TState, TState> _action;
- private readonly CatchSchedulerPeriodic _catchScheduler;
- public PeriodicallyScheduledWorkItem(CatchSchedulerPeriodic scheduler, TState state, TimeSpan period, Func<TState, TState> action)
- {
- _catchScheduler = scheduler;
- _action = action;
- // Note that avoiding closure allocation here would introduce infinite generic recursion over the TState argument
- Disposable.SetSingle(ref _cancel, scheduler._scheduler.SchedulePeriodic(state, period, state1 => Tick(state1).state));
- }
- public void Dispose()
- {
- Disposable.TryDispose(ref _cancel);
- }
- private (PeriodicallyScheduledWorkItem<TState> @this, TState state) Tick(TState state)
- {
- //
- // Cancellation may not be granted immediately; prevent from running user
- // code in that case. Periodic schedulers are assumed to introduce some
- // degree of concurrency, so we should return from the SchedulePeriodic
- // call eventually, allowing the d.Dispose() call in the catch block to
- // take effect.
- //
- if (_failed)
- {
- return default;
- }
- try
- {
- return (this, _action(state));
- }
- catch (TException exception)
- {
- _failed = true;
- if (!_catchScheduler._handler(exception))
- {
- throw;
- }
- Disposable.TryDispose(ref _cancel);
- return default;
- }
- }
- }
- private readonly ISchedulerPeriodic _scheduler;
- private readonly Func<TException, bool> _handler;
- public CatchSchedulerPeriodic(ISchedulerPeriodic scheduler, Func<TException, bool> handler)
- {
- _scheduler = scheduler;
- _handler = handler;
- }
- public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
- {
- return new PeriodicallyScheduledWorkItem<TState>(this, state, period, action);
- }
- }
- }
- }
|