transport_manager.go 8.4 KB


  1. package dns
  2. import (
  3. "context"
  4. "io"
  5. "os"
  6. "strings"
  7. "sync"
  8. "github.com/sagernet/sing-box/adapter"
  9. "github.com/sagernet/sing-box/common/taskmonitor"
  10. C "github.com/sagernet/sing-box/constant"
  11. "github.com/sagernet/sing-box/log"
  12. "github.com/sagernet/sing/common"
  13. E "github.com/sagernet/sing/common/exceptions"
  14. "github.com/sagernet/sing/common/logger"
  15. )
  16. var _ adapter.DNSTransportManager = (*TransportManager)(nil)
  17. type TransportManager struct {
  18. logger log.ContextLogger
  19. registry adapter.DNSTransportRegistry
  20. outbound adapter.OutboundManager
  21. defaultTag string
  22. access sync.RWMutex
  23. started bool
  24. stage adapter.StartStage
  25. transports []adapter.DNSTransport
  26. transportByTag map[string]adapter.DNSTransport
  27. dependByTag map[string][]string
  28. defaultTransport adapter.DNSTransport
  29. defaultTransportFallback adapter.DNSTransport
  30. fakeIPTransport adapter.FakeIPTransport
  31. }
  32. func NewTransportManager(logger logger.ContextLogger, registry adapter.DNSTransportRegistry, outbound adapter.OutboundManager, defaultTag string) *TransportManager {
  33. return &TransportManager{
  34. logger: logger,
  35. registry: registry,
  36. outbound: outbound,
  37. defaultTag: defaultTag,
  38. transportByTag: make(map[string]adapter.DNSTransport),
  39. dependByTag: make(map[string][]string),
  40. }
  41. }
  42. func (m *TransportManager) Initialize(defaultTransportFallback adapter.DNSTransport) {
  43. m.defaultTransportFallback = defaultTransportFallback
  44. }
  45. func (m *TransportManager) Start(stage adapter.StartStage) error {
  46. m.access.Lock()
  47. if m.started && m.stage >= stage {
  48. panic("already started")
  49. }
  50. m.started = true
  51. m.stage = stage
  52. outbounds := m.transports
  53. m.access.Unlock()
  54. if stage == adapter.StartStateStart {
  55. return m.startTransports(m.transports)
  56. } else {
  57. for _, outbound := range outbounds {
  58. err := adapter.LegacyStart(outbound, stage)
  59. if err != nil {
  60. return E.Cause(err, stage, " dns/", outbound.Type(), "[", outbound.Tag(), "]")
  61. }
  62. }
  63. }
  64. return nil
  65. }
  66. func (m *TransportManager) startTransports(transports []adapter.DNSTransport) error {
  67. monitor := taskmonitor.New(m.logger, C.StartTimeout)
  68. started := make(map[string]bool)
  69. for {
  70. canContinue := false
  71. startOne:
  72. for _, transportToStart := range transports {
  73. transportTag := transportToStart.Tag()
  74. if started[transportTag] {
  75. continue
  76. }
  77. dependencies := transportToStart.Dependencies()
  78. for _, dependency := range dependencies {
  79. if !started[dependency] {
  80. continue startOne
  81. }
  82. }
  83. started[transportTag] = true
  84. canContinue = true
  85. if starter, isStarter := transportToStart.(adapter.Lifecycle); isStarter {
  86. monitor.Start("start dns/", transportToStart.Type(), "[", transportTag, "]")
  87. err := starter.Start(adapter.StartStateStart)
  88. monitor.Finish()
  89. if err != nil {
  90. return E.Cause(err, "start dns/", transportToStart.Type(), "[", transportTag, "]")
  91. }
  92. }
  93. }
  94. if len(started) == len(transports) {
  95. break
  96. }
  97. if canContinue {
  98. continue
  99. }
  100. currentTransport := common.Find(transports, func(it adapter.DNSTransport) bool {
  101. return !started[it.Tag()]
  102. })
  103. var lintTransport func(oTree []string, oCurrent adapter.DNSTransport) error
  104. lintTransport = func(oTree []string, oCurrent adapter.DNSTransport) error {
  105. problemTransportTag := common.Find(oCurrent.Dependencies(), func(it string) bool {
  106. return !started[it]
  107. })
  108. if common.Contains(oTree, problemTransportTag) {
  109. return E.New("circular server dependency: ", strings.Join(oTree, " -> "), " -> ", problemTransportTag)
  110. }
  111. m.access.Lock()
  112. problemTransport := m.transportByTag[problemTransportTag]
  113. m.access.Unlock()
  114. if problemTransport == nil {
  115. return E.New("dependency[", problemTransportTag, "] not found for server[", oCurrent.Tag(), "]")
  116. }
  117. return lintTransport(append(oTree, problemTransportTag), problemTransport)
  118. }
  119. return lintTransport([]string{currentTransport.Tag()}, currentTransport)
  120. }
  121. return nil
  122. }
  123. func (m *TransportManager) Close() error {
  124. monitor := taskmonitor.New(m.logger, C.StopTimeout)
  125. m.access.Lock()
  126. if !m.started {
  127. m.access.Unlock()
  128. return nil
  129. }
  130. m.started = false
  131. transports := m.transports
  132. m.transports = nil
  133. m.access.Unlock()
  134. var err error
  135. for _, transport := range transports {
  136. if closer, isCloser := transport.(io.Closer); isCloser {
  137. monitor.Start("close server/", transport.Type(), "[", transport.Tag(), "]")
  138. err = E.Append(err, closer.Close(), func(err error) error {
  139. return E.Cause(err, "close server/", transport.Type(), "[", transport.Tag(), "]")
  140. })
  141. monitor.Finish()
  142. }
  143. }
  144. return nil
  145. }
  146. func (m *TransportManager) Transports() []adapter.DNSTransport {
  147. m.access.RLock()
  148. defer m.access.RUnlock()
  149. return m.transports
  150. }
  151. func (m *TransportManager) Transport(tag string) (adapter.DNSTransport, bool) {
  152. m.access.RLock()
  153. outbound, found := m.transportByTag[tag]
  154. m.access.RUnlock()
  155. return outbound, found
  156. }
  157. func (m *TransportManager) Default() adapter.DNSTransport {
  158. m.access.RLock()
  159. defer m.access.RUnlock()
  160. if m.defaultTransport != nil {
  161. return m.defaultTransport
  162. } else {
  163. return m.defaultTransportFallback
  164. }
  165. }
  166. func (m *TransportManager) FakeIP() adapter.FakeIPTransport {
  167. m.access.RLock()
  168. defer m.access.RUnlock()
  169. return m.fakeIPTransport
  170. }
  171. func (m *TransportManager) Remove(tag string) error {
  172. m.access.Lock()
  173. defer m.access.Unlock()
  174. transport, found := m.transportByTag[tag]
  175. if !found {
  176. return os.ErrInvalid
  177. }
  178. delete(m.transportByTag, tag)
  179. index := common.Index(m.transports, func(it adapter.DNSTransport) bool {
  180. return it == transport
  181. })
  182. if index == -1 {
  183. panic("invalid inbound index")
  184. }
  185. m.transports = append(m.transports[:index], m.transports[index+1:]...)
  186. started := m.started
  187. if m.defaultTransport == transport {
  188. if len(m.transports) > 0 {
  189. nextTransport := m.transports[0]
  190. if nextTransport.Type() != C.DNSTypeFakeIP {
  191. return E.New("default server cannot be fakeip")
  192. }
  193. m.defaultTransport = nextTransport
  194. m.logger.Info("updated default server to ", m.defaultTransport.Tag())
  195. } else {
  196. m.defaultTransport = nil
  197. }
  198. }
  199. dependBy := m.dependByTag[tag]
  200. if len(dependBy) > 0 {
  201. return E.New("server[", tag, "] is depended by ", strings.Join(dependBy, ", "))
  202. }
  203. dependencies := transport.Dependencies()
  204. for _, dependency := range dependencies {
  205. if len(m.dependByTag[dependency]) == 1 {
  206. delete(m.dependByTag, dependency)
  207. } else {
  208. m.dependByTag[dependency] = common.Filter(m.dependByTag[dependency], func(it string) bool {
  209. return it != tag
  210. })
  211. }
  212. }
  213. if started {
  214. transport.Reset()
  215. }
  216. return nil
  217. }
  218. func (m *TransportManager) Create(ctx context.Context, logger log.ContextLogger, tag string, transportType string, options any) error {
  219. if tag == "" {
  220. return os.ErrInvalid
  221. }
  222. transport, err := m.registry.CreateDNSTransport(ctx, logger, tag, transportType, options)
  223. if err != nil {
  224. return err
  225. }
  226. m.access.Lock()
  227. defer m.access.Unlock()
  228. if m.started {
  229. for _, stage := range adapter.ListStartStages {
  230. err = adapter.LegacyStart(transport, stage)
  231. if err != nil {
  232. return E.Cause(err, stage, " dns/", transport.Type(), "[", transport.Tag(), "]")
  233. }
  234. }
  235. }
  236. if existsTransport, loaded := m.transportByTag[tag]; loaded {
  237. if m.started {
  238. err = common.Close(existsTransport)
  239. if err != nil {
  240. return E.Cause(err, "close dns/", existsTransport.Type(), "[", existsTransport.Tag(), "]")
  241. }
  242. }
  243. existsIndex := common.Index(m.transports, func(it adapter.DNSTransport) bool {
  244. return it == existsTransport
  245. })
  246. if existsIndex == -1 {
  247. panic("invalid inbound index")
  248. }
  249. m.transports = append(m.transports[:existsIndex], m.transports[existsIndex+1:]...)
  250. }
  251. m.transports = append(m.transports, transport)
  252. m.transportByTag[tag] = transport
  253. dependencies := transport.Dependencies()
  254. for _, dependency := range dependencies {
  255. m.dependByTag[dependency] = append(m.dependByTag[dependency], tag)
  256. }
  257. if tag == m.defaultTag || (m.defaultTag == "" && m.defaultTransport == nil) {
  258. if transport.Type() == C.DNSTypeFakeIP {
  259. return E.New("default server cannot be fakeip")
  260. }
  261. m.defaultTransport = transport
  262. if m.started {
  263. m.logger.Info("updated default server to ", transport.Tag())
  264. }
  265. }
  266. if transport.Type() == C.DNSTypeFakeIP {
  267. if m.fakeIPTransport != nil {
  268. return E.New("multiple fakeip server are not supported")
  269. }
  270. m.fakeIPTransport = transport.(adapter.FakeIPTransport)
  271. }
  272. return nil
  273. }