always.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package inbound
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/app/proxyman"
  5. "github.com/xtls/xray-core/common"
  6. "github.com/xtls/xray-core/common/errors"
  7. "github.com/xtls/xray-core/common/mux"
  8. "github.com/xtls/xray-core/common/net"
  9. "github.com/xtls/xray-core/common/serial"
  10. "github.com/xtls/xray-core/common/session"
  11. "github.com/xtls/xray-core/core"
  12. "github.com/xtls/xray-core/features/policy"
  13. "github.com/xtls/xray-core/features/stats"
  14. "github.com/xtls/xray-core/proxy"
  15. "github.com/xtls/xray-core/transport/internet"
  16. "google.golang.org/protobuf/proto"
  17. )
  18. func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
  19. var uplinkCounter stats.Counter
  20. var downlinkCounter stats.Counter
  21. policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
  22. if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
  23. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  24. name := "inbound>>>" + tag + ">>>traffic>>>uplink"
  25. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  26. if c != nil {
  27. uplinkCounter = c
  28. }
  29. }
  30. if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
  31. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  32. name := "inbound>>>" + tag + ">>>traffic>>>downlink"
  33. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  34. if c != nil {
  35. downlinkCounter = c
  36. }
  37. }
  38. return uplinkCounter, downlinkCounter
  39. }
  40. type AlwaysOnInboundHandler struct {
  41. proxyConfig interface{}
  42. receiverConfig *proxyman.ReceiverConfig
  43. proxy proxy.Inbound
  44. workers []worker
  45. mux *mux.Server
  46. tag string
  47. }
  48. func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
  49. // Set tag and sniffing config in context before creating proxy
  50. // This allows proxies like TUN to access these settings
  51. ctx = session.ContextWithInbound(ctx, &session.Inbound{Tag: tag})
  52. if receiverConfig.SniffingSettings != nil {
  53. ctx = session.ContextWithContent(ctx, &session.Content{
  54. SniffingRequest: session.SniffingRequest{
  55. Enabled: receiverConfig.SniffingSettings.Enabled,
  56. OverrideDestinationForProtocol: receiverConfig.SniffingSettings.DestinationOverride,
  57. ExcludeForDomain: receiverConfig.SniffingSettings.DomainsExcluded,
  58. MetadataOnly: receiverConfig.SniffingSettings.MetadataOnly,
  59. RouteOnly: receiverConfig.SniffingSettings.RouteOnly,
  60. },
  61. })
  62. }
  63. rawProxy, err := common.CreateObject(ctx, proxyConfig)
  64. if err != nil {
  65. return nil, err
  66. }
  67. p, ok := rawProxy.(proxy.Inbound)
  68. if !ok {
  69. return nil, errors.New("not an inbound proxy.")
  70. }
  71. h := &AlwaysOnInboundHandler{
  72. receiverConfig: receiverConfig,
  73. proxyConfig: proxyConfig,
  74. proxy: p,
  75. mux: mux.NewServer(ctx),
  76. tag: tag,
  77. }
  78. uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
  79. nl := p.Network()
  80. pl := receiverConfig.PortList
  81. address := receiverConfig.Listen.AsAddress()
  82. if address == nil {
  83. address = net.AnyIP
  84. }
  85. mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
  86. if err != nil {
  87. return nil, errors.New("failed to parse stream config").Base(err).AtWarning()
  88. }
  89. if receiverConfig.ReceiveOriginalDestination {
  90. if mss.SocketSettings == nil {
  91. mss.SocketSettings = &internet.SocketConfig{}
  92. }
  93. if mss.SocketSettings.Tproxy == internet.SocketConfig_Off {
  94. mss.SocketSettings.Tproxy = internet.SocketConfig_Redirect
  95. }
  96. mss.SocketSettings.ReceiveOriginalDestAddress = true
  97. }
  98. if pl == nil {
  99. if net.HasNetwork(nl, net.Network_UNIX) {
  100. errors.LogDebug(ctx, "creating unix domain socket worker on ", address)
  101. worker := &dsWorker{
  102. address: address,
  103. proxy: p,
  104. stream: mss,
  105. tag: tag,
  106. dispatcher: h.mux,
  107. sniffingConfig: receiverConfig.SniffingSettings,
  108. uplinkCounter: uplinkCounter,
  109. downlinkCounter: downlinkCounter,
  110. ctx: ctx,
  111. }
  112. h.workers = append(h.workers, worker)
  113. }
  114. }
  115. if pl != nil {
  116. for _, pr := range pl.Range {
  117. for port := pr.From; port <= pr.To; port++ {
  118. if net.HasNetwork(nl, net.Network_TCP) {
  119. errors.LogDebug(ctx, "creating stream worker on ", address, ":", port)
  120. worker := &tcpWorker{
  121. address: address,
  122. port: net.Port(port),
  123. proxy: p,
  124. stream: mss,
  125. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  126. tag: tag,
  127. dispatcher: h.mux,
  128. sniffingConfig: receiverConfig.SniffingSettings,
  129. uplinkCounter: uplinkCounter,
  130. downlinkCounter: downlinkCounter,
  131. ctx: ctx,
  132. }
  133. h.workers = append(h.workers, worker)
  134. }
  135. if net.HasNetwork(nl, net.Network_UDP) {
  136. worker := &udpWorker{
  137. tag: tag,
  138. proxy: p,
  139. address: address,
  140. port: net.Port(port),
  141. dispatcher: h.mux,
  142. sniffingConfig: receiverConfig.SniffingSettings,
  143. uplinkCounter: uplinkCounter,
  144. downlinkCounter: downlinkCounter,
  145. stream: mss,
  146. ctx: ctx,
  147. }
  148. h.workers = append(h.workers, worker)
  149. }
  150. }
  151. }
  152. }
  153. return h, nil
  154. }
  155. // Start implements common.Runnable.
  156. func (h *AlwaysOnInboundHandler) Start() error {
  157. for _, worker := range h.workers {
  158. if err := worker.Start(); err != nil {
  159. return err
  160. }
  161. }
  162. return nil
  163. }
  164. // Close implements common.Closable.
  165. func (h *AlwaysOnInboundHandler) Close() error {
  166. var errs []error
  167. for _, worker := range h.workers {
  168. errs = append(errs, worker.Close())
  169. }
  170. errs = append(errs, h.mux.Close())
  171. if err := errors.Combine(errs...); err != nil {
  172. return errors.New("failed to close all resources").Base(err)
  173. }
  174. return nil
  175. }
  176. func (h *AlwaysOnInboundHandler) Tag() string {
  177. return h.tag
  178. }
  179. func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
  180. return h.proxy
  181. }
  182. // ReceiverSettings implements inbound.Handler.
  183. func (h *AlwaysOnInboundHandler) ReceiverSettings() *serial.TypedMessage {
  184. return serial.ToTypedMessage(h.receiverConfig)
  185. }
  186. // ProxySettings implements inbound.Handler.
  187. func (h *AlwaysOnInboundHandler) ProxySettings() *serial.TypedMessage {
  188. if v, ok := h.proxyConfig.(proto.Message); ok {
  189. return serial.ToTypedMessage(v)
  190. }
  191. return nil
  192. }