manager.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package inbound
  2. import (
  3. "context"
  4. "os"
  5. "sync"
  6. "time"
  7. "github.com/sagernet/sing-box/adapter"
  8. "github.com/sagernet/sing-box/common/taskmonitor"
  9. C "github.com/sagernet/sing-box/constant"
  10. "github.com/sagernet/sing-box/log"
  11. "github.com/sagernet/sing/common"
  12. E "github.com/sagernet/sing/common/exceptions"
  13. F "github.com/sagernet/sing/common/format"
  14. )
  15. var _ adapter.InboundManager = (*Manager)(nil)
  16. type Manager struct {
  17. logger log.ContextLogger
  18. registry adapter.InboundRegistry
  19. endpoint adapter.EndpointManager
  20. access sync.Mutex
  21. started bool
  22. stage adapter.StartStage
  23. inbounds []adapter.Inbound
  24. inboundByTag map[string]adapter.Inbound
  25. }
  26. func NewManager(logger log.ContextLogger, registry adapter.InboundRegistry, endpoint adapter.EndpointManager) *Manager {
  27. return &Manager{
  28. logger: logger,
  29. registry: registry,
  30. endpoint: endpoint,
  31. inboundByTag: make(map[string]adapter.Inbound),
  32. }
  33. }
  34. func (m *Manager) Start(stage adapter.StartStage) error {
  35. m.access.Lock()
  36. if m.started && m.stage >= stage {
  37. panic("already started")
  38. }
  39. m.started = true
  40. m.stage = stage
  41. inbounds := m.inbounds
  42. m.access.Unlock()
  43. for _, inbound := range inbounds {
  44. name := "inbound/" + inbound.Type() + "[" + inbound.Tag() + "]"
  45. m.logger.Trace(stage, " ", name)
  46. startTime := time.Now()
  47. err := adapter.LegacyStart(inbound, stage)
  48. if err != nil {
  49. return E.Cause(err, stage, " ", name)
  50. }
  51. m.logger.Trace(stage, " ", name, " completed (", F.Seconds(time.Since(startTime).Seconds()), "s)")
  52. }
  53. return nil
  54. }
  55. func (m *Manager) Close() error {
  56. m.access.Lock()
  57. defer m.access.Unlock()
  58. if !m.started {
  59. return nil
  60. }
  61. m.started = false
  62. inbounds := m.inbounds
  63. m.inbounds = nil
  64. monitor := taskmonitor.New(m.logger, C.StopTimeout)
  65. var err error
  66. for _, inbound := range inbounds {
  67. name := "inbound/" + inbound.Type() + "[" + inbound.Tag() + "]"
  68. m.logger.Trace("close ", name)
  69. startTime := time.Now()
  70. monitor.Start("close ", name)
  71. err = E.Append(err, inbound.Close(), func(err error) error {
  72. return E.Cause(err, "close ", name)
  73. })
  74. monitor.Finish()
  75. m.logger.Trace("close ", name, " completed (", F.Seconds(time.Since(startTime).Seconds()), "s)")
  76. }
  77. return nil
  78. }
  79. func (m *Manager) Inbounds() []adapter.Inbound {
  80. m.access.Lock()
  81. defer m.access.Unlock()
  82. return m.inbounds
  83. }
  84. func (m *Manager) Get(tag string) (adapter.Inbound, bool) {
  85. m.access.Lock()
  86. inbound, found := m.inboundByTag[tag]
  87. m.access.Unlock()
  88. if found {
  89. return inbound, true
  90. }
  91. return m.endpoint.Get(tag)
  92. }
  93. func (m *Manager) Remove(tag string) error {
  94. m.access.Lock()
  95. inbound, found := m.inboundByTag[tag]
  96. if !found {
  97. m.access.Unlock()
  98. return os.ErrInvalid
  99. }
  100. delete(m.inboundByTag, tag)
  101. index := common.Index(m.inbounds, func(it adapter.Inbound) bool {
  102. return it == inbound
  103. })
  104. if index == -1 {
  105. panic("invalid inbound index")
  106. }
  107. m.inbounds = append(m.inbounds[:index], m.inbounds[index+1:]...)
  108. started := m.started
  109. m.access.Unlock()
  110. if started {
  111. return inbound.Close()
  112. }
  113. return nil
  114. }
  115. func (m *Manager) Create(ctx context.Context, router adapter.Router, logger log.ContextLogger, tag string, outboundType string, options any) error {
  116. inbound, err := m.registry.Create(ctx, router, logger, tag, outboundType, options)
  117. if err != nil {
  118. return err
  119. }
  120. m.access.Lock()
  121. defer m.access.Unlock()
  122. if m.started {
  123. name := "inbound/" + inbound.Type() + "[" + inbound.Tag() + "]"
  124. for _, stage := range adapter.ListStartStages {
  125. m.logger.Trace(stage, " ", name)
  126. startTime := time.Now()
  127. err = adapter.LegacyStart(inbound, stage)
  128. if err != nil {
  129. return E.Cause(err, stage, " ", name)
  130. }
  131. m.logger.Trace(stage, " ", name, " completed (", F.Seconds(time.Since(startTime).Seconds()), "s)")
  132. }
  133. }
  134. if existsInbound, loaded := m.inboundByTag[tag]; loaded {
  135. if m.started {
  136. err = existsInbound.Close()
  137. if err != nil {
  138. return E.Cause(err, "close inbound/", existsInbound.Type(), "[", existsInbound.Tag(), "]")
  139. }
  140. }
  141. existsIndex := common.Index(m.inbounds, func(it adapter.Inbound) bool {
  142. return it == existsInbound
  143. })
  144. if existsIndex == -1 {
  145. panic("invalid inbound index")
  146. }
  147. m.inbounds = append(m.inbounds[:existsIndex], m.inbounds[existsIndex+1:]...)
  148. }
  149. m.inbounds = append(m.inbounds, inbound)
  150. m.inboundByTag[tag] = inbound
  151. return nil
  152. }