always.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package inbound
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/app/proxyman"
  5. "github.com/xtls/xray-core/common"
  6. "github.com/xtls/xray-core/common/errors"
  7. "github.com/xtls/xray-core/common/mux"
  8. "github.com/xtls/xray-core/common/net"
  9. "github.com/xtls/xray-core/common/serial"
  10. "github.com/xtls/xray-core/core"
  11. "github.com/xtls/xray-core/features/policy"
  12. "github.com/xtls/xray-core/features/stats"
  13. "github.com/xtls/xray-core/proxy"
  14. "github.com/xtls/xray-core/transport/internet"
  15. "google.golang.org/protobuf/proto"
  16. )
  17. func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
  18. var uplinkCounter stats.Counter
  19. var downlinkCounter stats.Counter
  20. policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
  21. if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
  22. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  23. name := "inbound>>>" + tag + ">>>traffic>>>uplink"
  24. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  25. if c != nil {
  26. uplinkCounter = c
  27. }
  28. }
  29. if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
  30. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  31. name := "inbound>>>" + tag + ">>>traffic>>>downlink"
  32. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  33. if c != nil {
  34. downlinkCounter = c
  35. }
  36. }
  37. return uplinkCounter, downlinkCounter
  38. }
  39. type AlwaysOnInboundHandler struct {
  40. proxyConfig interface{}
  41. receiverConfig *proxyman.ReceiverConfig
  42. proxy proxy.Inbound
  43. workers []worker
  44. mux *mux.Server
  45. tag string
  46. }
  47. func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
  48. rawProxy, err := common.CreateObject(ctx, proxyConfig)
  49. if err != nil {
  50. return nil, err
  51. }
  52. p, ok := rawProxy.(proxy.Inbound)
  53. if !ok {
  54. return nil, errors.New("not an inbound proxy.")
  55. }
  56. h := &AlwaysOnInboundHandler{
  57. receiverConfig: receiverConfig,
  58. proxyConfig: proxyConfig,
  59. proxy: p,
  60. mux: mux.NewServer(ctx),
  61. tag: tag,
  62. }
  63. uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
  64. nl := p.Network()
  65. pl := receiverConfig.PortList
  66. address := receiverConfig.Listen.AsAddress()
  67. if address == nil {
  68. address = net.AnyIP
  69. }
  70. mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
  71. if err != nil {
  72. return nil, errors.New("failed to parse stream config").Base(err).AtWarning()
  73. }
  74. if receiverConfig.ReceiveOriginalDestination {
  75. if mss.SocketSettings == nil {
  76. mss.SocketSettings = &internet.SocketConfig{}
  77. }
  78. if mss.SocketSettings.Tproxy == internet.SocketConfig_Off {
  79. mss.SocketSettings.Tproxy = internet.SocketConfig_Redirect
  80. }
  81. mss.SocketSettings.ReceiveOriginalDestAddress = true
  82. }
  83. if pl == nil {
  84. if net.HasNetwork(nl, net.Network_UNIX) {
  85. errors.LogDebug(ctx, "creating unix domain socket worker on ", address)
  86. worker := &dsWorker{
  87. address: address,
  88. proxy: p,
  89. stream: mss,
  90. tag: tag,
  91. dispatcher: h.mux,
  92. sniffingConfig: receiverConfig.SniffingSettings,
  93. uplinkCounter: uplinkCounter,
  94. downlinkCounter: downlinkCounter,
  95. ctx: ctx,
  96. }
  97. h.workers = append(h.workers, worker)
  98. }
  99. }
  100. if pl != nil {
  101. for _, pr := range pl.Range {
  102. for port := pr.From; port <= pr.To; port++ {
  103. if net.HasNetwork(nl, net.Network_TCP) {
  104. errors.LogDebug(ctx, "creating stream worker on ", address, ":", port)
  105. worker := &tcpWorker{
  106. address: address,
  107. port: net.Port(port),
  108. proxy: p,
  109. stream: mss,
  110. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  111. tag: tag,
  112. dispatcher: h.mux,
  113. sniffingConfig: receiverConfig.SniffingSettings,
  114. uplinkCounter: uplinkCounter,
  115. downlinkCounter: downlinkCounter,
  116. ctx: ctx,
  117. }
  118. h.workers = append(h.workers, worker)
  119. }
  120. if net.HasNetwork(nl, net.Network_UDP) {
  121. worker := &udpWorker{
  122. tag: tag,
  123. proxy: p,
  124. address: address,
  125. port: net.Port(port),
  126. dispatcher: h.mux,
  127. sniffingConfig: receiverConfig.SniffingSettings,
  128. uplinkCounter: uplinkCounter,
  129. downlinkCounter: downlinkCounter,
  130. stream: mss,
  131. ctx: ctx,
  132. }
  133. h.workers = append(h.workers, worker)
  134. }
  135. }
  136. }
  137. }
  138. return h, nil
  139. }
  140. // Start implements common.Runnable.
  141. func (h *AlwaysOnInboundHandler) Start() error {
  142. for _, worker := range h.workers {
  143. if err := worker.Start(); err != nil {
  144. return err
  145. }
  146. }
  147. return nil
  148. }
  149. // Close implements common.Closable.
  150. func (h *AlwaysOnInboundHandler) Close() error {
  151. var errs []error
  152. for _, worker := range h.workers {
  153. errs = append(errs, worker.Close())
  154. }
  155. errs = append(errs, h.mux.Close())
  156. if err := errors.Combine(errs...); err != nil {
  157. return errors.New("failed to close all resources").Base(err)
  158. }
  159. return nil
  160. }
  161. func (h *AlwaysOnInboundHandler) Tag() string {
  162. return h.tag
  163. }
  164. func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
  165. return h.proxy
  166. }
  167. // ReceiverSettings implements inbound.Handler.
  168. func (h *AlwaysOnInboundHandler) ReceiverSettings() *serial.TypedMessage {
  169. return serial.ToTypedMessage(h.receiverConfig)
  170. }
  171. // ProxySettings implements inbound.Handler.
  172. func (h *AlwaysOnInboundHandler) ProxySettings() *serial.TypedMessage {
  173. if v, ok := h.proxyConfig.(proto.Message); ok {
  174. return serial.ToTypedMessage(v)
  175. }
  176. return nil
  177. }