always.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package inbound
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/app/proxyman"
  5. "github.com/xtls/xray-core/common"
  6. "github.com/xtls/xray-core/common/dice"
  7. "github.com/xtls/xray-core/common/errors"
  8. "github.com/xtls/xray-core/common/mux"
  9. "github.com/xtls/xray-core/common/net"
  10. "github.com/xtls/xray-core/common/serial"
  11. "github.com/xtls/xray-core/core"
  12. "github.com/xtls/xray-core/features/policy"
  13. "github.com/xtls/xray-core/features/stats"
  14. "github.com/xtls/xray-core/proxy"
  15. "github.com/xtls/xray-core/transport/internet"
  16. "google.golang.org/protobuf/proto"
  17. )
  18. func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
  19. var uplinkCounter stats.Counter
  20. var downlinkCounter stats.Counter
  21. policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
  22. if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
  23. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  24. name := "inbound>>>" + tag + ">>>traffic>>>uplink"
  25. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  26. if c != nil {
  27. uplinkCounter = c
  28. }
  29. }
  30. if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
  31. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  32. name := "inbound>>>" + tag + ">>>traffic>>>downlink"
  33. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  34. if c != nil {
  35. downlinkCounter = c
  36. }
  37. }
  38. return uplinkCounter, downlinkCounter
  39. }
  40. type AlwaysOnInboundHandler struct {
  41. proxyConfig interface{}
  42. receiverConfig *proxyman.ReceiverConfig
  43. proxy proxy.Inbound
  44. workers []worker
  45. mux *mux.Server
  46. tag string
  47. }
  48. func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
  49. rawProxy, err := common.CreateObject(ctx, proxyConfig)
  50. if err != nil {
  51. return nil, err
  52. }
  53. p, ok := rawProxy.(proxy.Inbound)
  54. if !ok {
  55. return nil, errors.New("not an inbound proxy.")
  56. }
  57. h := &AlwaysOnInboundHandler{
  58. receiverConfig: receiverConfig,
  59. proxyConfig: proxyConfig,
  60. proxy: p,
  61. mux: mux.NewServer(ctx),
  62. tag: tag,
  63. }
  64. uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
  65. nl := p.Network()
  66. pl := receiverConfig.PortList
  67. address := receiverConfig.Listen.AsAddress()
  68. if address == nil {
  69. address = net.AnyIP
  70. }
  71. mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
  72. if err != nil {
  73. return nil, errors.New("failed to parse stream config").Base(err).AtWarning()
  74. }
  75. if receiverConfig.ReceiveOriginalDestination {
  76. if mss.SocketSettings == nil {
  77. mss.SocketSettings = &internet.SocketConfig{}
  78. }
  79. if mss.SocketSettings.Tproxy == internet.SocketConfig_Off {
  80. mss.SocketSettings.Tproxy = internet.SocketConfig_Redirect
  81. }
  82. mss.SocketSettings.ReceiveOriginalDestAddress = true
  83. }
  84. if pl == nil {
  85. if net.HasNetwork(nl, net.Network_UNIX) {
  86. errors.LogDebug(ctx, "creating unix domain socket worker on ", address)
  87. worker := &dsWorker{
  88. address: address,
  89. proxy: p,
  90. stream: mss,
  91. tag: tag,
  92. dispatcher: h.mux,
  93. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  94. uplinkCounter: uplinkCounter,
  95. downlinkCounter: downlinkCounter,
  96. ctx: ctx,
  97. }
  98. h.workers = append(h.workers, worker)
  99. }
  100. }
  101. if pl != nil {
  102. for _, pr := range pl.Range {
  103. for port := pr.From; port <= pr.To; port++ {
  104. if net.HasNetwork(nl, net.Network_TCP) {
  105. errors.LogDebug(ctx, "creating stream worker on ", address, ":", port)
  106. worker := &tcpWorker{
  107. address: address,
  108. port: net.Port(port),
  109. proxy: p,
  110. stream: mss,
  111. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  112. tag: tag,
  113. dispatcher: h.mux,
  114. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  115. uplinkCounter: uplinkCounter,
  116. downlinkCounter: downlinkCounter,
  117. ctx: ctx,
  118. }
  119. h.workers = append(h.workers, worker)
  120. }
  121. if net.HasNetwork(nl, net.Network_UDP) {
  122. worker := &udpWorker{
  123. tag: tag,
  124. proxy: p,
  125. address: address,
  126. port: net.Port(port),
  127. dispatcher: h.mux,
  128. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  129. uplinkCounter: uplinkCounter,
  130. downlinkCounter: downlinkCounter,
  131. stream: mss,
  132. ctx: ctx,
  133. }
  134. h.workers = append(h.workers, worker)
  135. }
  136. }
  137. }
  138. }
  139. return h, nil
  140. }
  141. // Start implements common.Runnable.
  142. func (h *AlwaysOnInboundHandler) Start() error {
  143. for _, worker := range h.workers {
  144. if err := worker.Start(); err != nil {
  145. return err
  146. }
  147. }
  148. return nil
  149. }
  150. // Close implements common.Closable.
  151. func (h *AlwaysOnInboundHandler) Close() error {
  152. var errs []error
  153. for _, worker := range h.workers {
  154. errs = append(errs, worker.Close())
  155. }
  156. errs = append(errs, h.mux.Close())
  157. if err := errors.Combine(errs...); err != nil {
  158. return errors.New("failed to close all resources").Base(err)
  159. }
  160. return nil
  161. }
  162. func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {
  163. if len(h.workers) == 0 {
  164. return nil, 0, 0
  165. }
  166. w := h.workers[dice.Roll(len(h.workers))]
  167. return w.Proxy(), w.Port(), 9999
  168. }
  169. func (h *AlwaysOnInboundHandler) Tag() string {
  170. return h.tag
  171. }
  172. func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
  173. return h.proxy
  174. }
  175. // ReceiverSettings implements inbound.Handler.
  176. func (h *AlwaysOnInboundHandler) ReceiverSettings() *serial.TypedMessage {
  177. return serial.ToTypedMessage(h.receiverConfig)
  178. }
  179. // ProxySettings implements inbound.Handler.
  180. func (h *AlwaysOnInboundHandler) ProxySettings() *serial.TypedMessage {
  181. if v, ok := h.proxyConfig.(proto.Message); ok {
  182. return serial.ToTypedMessage(v)
  183. }
  184. return nil
  185. }