conn_monitor.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package route
  2. import (
  3. "context"
  4. "io"
  5. "reflect"
  6. "sync"
  7. "time"
  8. N "github.com/sagernet/sing/common/network"
  9. "github.com/sagernet/sing/common/x/list"
  10. )
  11. type ConnectionMonitor struct {
  12. access sync.RWMutex
  13. reloadChan chan struct{}
  14. connections list.List[*monitorEntry]
  15. }
  16. type monitorEntry struct {
  17. ctx context.Context
  18. closer io.Closer
  19. }
  20. func NewConnectionMonitor() *ConnectionMonitor {
  21. return &ConnectionMonitor{
  22. reloadChan: make(chan struct{}, 1),
  23. }
  24. }
  25. func (m *ConnectionMonitor) Add(ctx context.Context, closer io.Closer) N.CloseHandlerFunc {
  26. m.access.Lock()
  27. defer m.access.Unlock()
  28. element := m.connections.PushBack(&monitorEntry{
  29. ctx: ctx,
  30. closer: closer,
  31. })
  32. select {
  33. case <-m.reloadChan:
  34. return nil
  35. default:
  36. select {
  37. case m.reloadChan <- struct{}{}:
  38. default:
  39. }
  40. }
  41. return func(it error) {
  42. m.access.Lock()
  43. defer m.access.Unlock()
  44. m.connections.Remove(element)
  45. select {
  46. case <-m.reloadChan:
  47. default:
  48. select {
  49. case m.reloadChan <- struct{}{}:
  50. default:
  51. }
  52. }
  53. }
  54. }
  55. func (m *ConnectionMonitor) Start() error {
  56. go m.monitor()
  57. return nil
  58. }
  59. func (m *ConnectionMonitor) Close() error {
  60. m.access.Lock()
  61. defer m.access.Unlock()
  62. close(m.reloadChan)
  63. for element := m.connections.Front(); element != nil; element = element.Next() {
  64. element.Value.closer.Close()
  65. }
  66. return nil
  67. }
  68. func (m *ConnectionMonitor) monitor() {
  69. var (
  70. selectCases []reflect.SelectCase
  71. elements []*list.Element[*monitorEntry]
  72. )
  73. rootCase := reflect.SelectCase{
  74. Dir: reflect.SelectRecv,
  75. Chan: reflect.ValueOf(m.reloadChan),
  76. }
  77. for {
  78. m.access.RLock()
  79. if m.connections.Len() == 0 {
  80. m.access.RUnlock()
  81. if _, loaded := <-m.reloadChan; !loaded {
  82. return
  83. } else {
  84. continue
  85. }
  86. }
  87. if len(elements) < m.connections.Len() {
  88. elements = make([]*list.Element[*monitorEntry], 0, m.connections.Len())
  89. }
  90. if len(selectCases) < m.connections.Len()+1 {
  91. selectCases = make([]reflect.SelectCase, 0, m.connections.Len()+1)
  92. }
  93. elements = elements[:0]
  94. selectCases = selectCases[:1]
  95. selectCases[0] = rootCase
  96. for element := m.connections.Front(); element != nil; element = element.Next() {
  97. elements = append(elements, element)
  98. selectCases = append(selectCases, reflect.SelectCase{
  99. Dir: reflect.SelectRecv,
  100. Chan: reflect.ValueOf(element.Value.ctx.Done()),
  101. })
  102. }
  103. m.access.RUnlock()
  104. selected, _, loaded := reflect.Select(selectCases)
  105. if selected == 0 {
  106. if !loaded {
  107. return
  108. } else {
  109. time.Sleep(time.Second)
  110. continue
  111. }
  112. }
  113. element := elements[selected-1]
  114. m.access.Lock()
  115. m.connections.Remove(element)
  116. m.access.Unlock()
  117. element.Value.closer.Close() // maybe go close
  118. }
  119. }