timer.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package signal
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/xtls/xray-core/common"
  8. "github.com/xtls/xray-core/common/task"
  9. )
  10. type ActivityUpdater interface {
  11. Update()
  12. }
  13. type ActivityTimer struct {
  14. mu sync.RWMutex
  15. updated chan struct{}
  16. checkTask *task.Periodic
  17. onTimeout func()
  18. consumed atomic.Bool
  19. once sync.Once
  20. }
  21. func (t *ActivityTimer) Update() {
  22. select {
  23. case t.updated <- struct{}{}:
  24. default:
  25. }
  26. }
  27. func (t *ActivityTimer) check() error {
  28. select {
  29. case <-t.updated:
  30. default:
  31. t.finish()
  32. }
  33. return nil
  34. }
  35. func (t *ActivityTimer) finish() {
  36. t.once.Do(func() {
  37. t.consumed.Store(true)
  38. t.mu.Lock()
  39. defer t.mu.Unlock()
  40. common.CloseIfExists(t.checkTask)
  41. t.onTimeout()
  42. })
  43. }
  44. func (t *ActivityTimer) SetTimeout(timeout time.Duration) {
  45. if t.consumed.Load() {
  46. return
  47. }
  48. if timeout == 0 {
  49. t.finish()
  50. return
  51. }
  52. t.mu.Lock()
  53. defer t.mu.Unlock()
  54. // double check, just in case
  55. if t.consumed.Load() {
  56. return
  57. }
  58. newCheckTask := &task.Periodic{
  59. Interval: timeout,
  60. Execute: t.check,
  61. }
  62. common.CloseIfExists(t.checkTask)
  63. t.checkTask = newCheckTask
  64. t.Update()
  65. common.Must(newCheckTask.Start())
  66. }
  67. func CancelAfterInactivity(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) *ActivityTimer {
  68. timer := &ActivityTimer{
  69. updated: make(chan struct{}, 1),
  70. onTimeout: cancel,
  71. }
  72. timer.SetTimeout(timeout)
  73. return timer
  74. }