manager.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package inbound
  2. import (
  3. "context"
  4. "os"
  5. "sync"
  6. "github.com/sagernet/sing-box/adapter"
  7. "github.com/sagernet/sing-box/common/taskmonitor"
  8. C "github.com/sagernet/sing-box/constant"
  9. "github.com/sagernet/sing-box/log"
  10. "github.com/sagernet/sing/common"
  11. E "github.com/sagernet/sing/common/exceptions"
  12. )
  13. var _ adapter.InboundManager = (*Manager)(nil)
  14. type Manager struct {
  15. logger log.ContextLogger
  16. registry adapter.InboundRegistry
  17. endpoint adapter.EndpointManager
  18. access sync.Mutex
  19. started bool
  20. stage adapter.StartStage
  21. inbounds []adapter.Inbound
  22. inboundByTag map[string]adapter.Inbound
  23. }
  24. func NewManager(logger log.ContextLogger, registry adapter.InboundRegistry, endpoint adapter.EndpointManager) *Manager {
  25. return &Manager{
  26. logger: logger,
  27. registry: registry,
  28. endpoint: endpoint,
  29. inboundByTag: make(map[string]adapter.Inbound),
  30. }
  31. }
  32. func (m *Manager) Start(stage adapter.StartStage) error {
  33. m.access.Lock()
  34. if m.started && m.stage >= stage {
  35. panic("already started")
  36. }
  37. m.started = true
  38. m.stage = stage
  39. inbounds := m.inbounds
  40. m.access.Unlock()
  41. for _, inbound := range inbounds {
  42. name := "inbound/" + inbound.Type() + "[" + inbound.Tag() + "]"
  43. done := adapter.LogElapsed(m.logger, stage, " ", name)
  44. err := adapter.LegacyStart(inbound, stage)
  45. done()
  46. if err != nil {
  47. return E.Cause(err, stage, " ", name)
  48. }
  49. }
  50. return nil
  51. }
  52. func (m *Manager) Close() error {
  53. m.access.Lock()
  54. defer m.access.Unlock()
  55. if !m.started {
  56. return nil
  57. }
  58. m.started = false
  59. inbounds := m.inbounds
  60. m.inbounds = nil
  61. monitor := taskmonitor.New(m.logger, C.StopTimeout)
  62. var err error
  63. for _, inbound := range inbounds {
  64. name := "inbound/" + inbound.Type() + "[" + inbound.Tag() + "]"
  65. done := adapter.LogElapsed(m.logger, "close ", name)
  66. monitor.Start("close ", name)
  67. err = E.Append(err, inbound.Close(), func(err error) error {
  68. return E.Cause(err, "close ", name)
  69. })
  70. monitor.Finish()
  71. done()
  72. }
  73. return nil
  74. }
  75. func (m *Manager) Inbounds() []adapter.Inbound {
  76. m.access.Lock()
  77. defer m.access.Unlock()
  78. return m.inbounds
  79. }
  80. func (m *Manager) Get(tag string) (adapter.Inbound, bool) {
  81. m.access.Lock()
  82. inbound, found := m.inboundByTag[tag]
  83. m.access.Unlock()
  84. if found {
  85. return inbound, true
  86. }
  87. return m.endpoint.Get(tag)
  88. }
  89. func (m *Manager) Remove(tag string) error {
  90. m.access.Lock()
  91. inbound, found := m.inboundByTag[tag]
  92. if !found {
  93. m.access.Unlock()
  94. return os.ErrInvalid
  95. }
  96. delete(m.inboundByTag, tag)
  97. index := common.Index(m.inbounds, func(it adapter.Inbound) bool {
  98. return it == inbound
  99. })
  100. if index == -1 {
  101. panic("invalid inbound index")
  102. }
  103. m.inbounds = append(m.inbounds[:index], m.inbounds[index+1:]...)
  104. started := m.started
  105. m.access.Unlock()
  106. if started {
  107. return inbound.Close()
  108. }
  109. return nil
  110. }
  111. func (m *Manager) Create(ctx context.Context, router adapter.Router, logger log.ContextLogger, tag string, outboundType string, options any) error {
  112. inbound, err := m.registry.Create(ctx, router, logger, tag, outboundType, options)
  113. if err != nil {
  114. return err
  115. }
  116. m.access.Lock()
  117. defer m.access.Unlock()
  118. if m.started {
  119. name := "inbound/" + inbound.Type() + "[" + inbound.Tag() + "]"
  120. for _, stage := range adapter.ListStartStages {
  121. done := adapter.LogElapsed(m.logger, stage, " ", name)
  122. err = adapter.LegacyStart(inbound, stage)
  123. done()
  124. if err != nil {
  125. return E.Cause(err, stage, " ", name)
  126. }
  127. }
  128. }
  129. if existsInbound, loaded := m.inboundByTag[tag]; loaded {
  130. if m.started {
  131. err = existsInbound.Close()
  132. if err != nil {
  133. return E.Cause(err, "close inbound/", existsInbound.Type(), "[", existsInbound.Tag(), "]")
  134. }
  135. }
  136. existsIndex := common.Index(m.inbounds, func(it adapter.Inbound) bool {
  137. return it == existsInbound
  138. })
  139. if existsIndex == -1 {
  140. panic("invalid inbound index")
  141. }
  142. m.inbounds = append(m.inbounds[:existsIndex], m.inbounds[existsIndex+1:]...)
  143. }
  144. m.inbounds = append(m.inbounds, inbound)
  145. m.inboundByTag[tag] = inbound
  146. return nil
  147. }