always.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package inbound
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/app/proxyman"
  5. "github.com/xtls/xray-core/common"
  6. "github.com/xtls/xray-core/common/dice"
  7. "github.com/xtls/xray-core/common/errors"
  8. "github.com/xtls/xray-core/common/mux"
  9. "github.com/xtls/xray-core/common/net"
  10. "github.com/xtls/xray-core/core"
  11. "github.com/xtls/xray-core/features/policy"
  12. "github.com/xtls/xray-core/features/stats"
  13. "github.com/xtls/xray-core/proxy"
  14. "github.com/xtls/xray-core/transport/internet"
  15. )
  16. func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
  17. var uplinkCounter stats.Counter
  18. var downlinkCounter stats.Counter
  19. policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
  20. if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
  21. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  22. name := "inbound>>>" + tag + ">>>traffic>>>uplink"
  23. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  24. if c != nil {
  25. uplinkCounter = c
  26. }
  27. }
  28. if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
  29. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  30. name := "inbound>>>" + tag + ">>>traffic>>>downlink"
  31. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  32. if c != nil {
  33. downlinkCounter = c
  34. }
  35. }
  36. return uplinkCounter, downlinkCounter
  37. }
  38. type AlwaysOnInboundHandler struct {
  39. proxy proxy.Inbound
  40. workers []worker
  41. mux *mux.Server
  42. tag string
  43. }
  44. func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
  45. rawProxy, err := common.CreateObject(ctx, proxyConfig)
  46. if err != nil {
  47. return nil, err
  48. }
  49. p, ok := rawProxy.(proxy.Inbound)
  50. if !ok {
  51. return nil, newError("not an inbound proxy.")
  52. }
  53. h := &AlwaysOnInboundHandler{
  54. proxy: p,
  55. mux: mux.NewServer(ctx),
  56. tag: tag,
  57. }
  58. uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
  59. nl := p.Network()
  60. pl := receiverConfig.PortList
  61. address := receiverConfig.Listen.AsAddress()
  62. if address == nil {
  63. address = net.AnyIP
  64. }
  65. mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
  66. if err != nil {
  67. return nil, newError("failed to parse stream config").Base(err).AtWarning()
  68. }
  69. if receiverConfig.ReceiveOriginalDestination {
  70. if mss.SocketSettings == nil {
  71. mss.SocketSettings = &internet.SocketConfig{}
  72. }
  73. if mss.SocketSettings.Tproxy == internet.SocketConfig_Off {
  74. mss.SocketSettings.Tproxy = internet.SocketConfig_Redirect
  75. }
  76. mss.SocketSettings.ReceiveOriginalDestAddress = true
  77. }
  78. if pl == nil {
  79. if net.HasNetwork(nl, net.Network_UNIX) {
  80. newError("creating unix domain socket worker on ", address).AtDebug().WriteToLog()
  81. worker := &dsWorker{
  82. address: address,
  83. proxy: p,
  84. stream: mss,
  85. tag: tag,
  86. dispatcher: h.mux,
  87. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  88. uplinkCounter: uplinkCounter,
  89. downlinkCounter: downlinkCounter,
  90. ctx: ctx,
  91. }
  92. h.workers = append(h.workers, worker)
  93. }
  94. }
  95. if pl != nil {
  96. for _, pr := range pl.Range {
  97. for port := pr.From; port <= pr.To; port++ {
  98. if net.HasNetwork(nl, net.Network_TCP) {
  99. newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()
  100. worker := &tcpWorker{
  101. address: address,
  102. port: net.Port(port),
  103. proxy: p,
  104. stream: mss,
  105. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  106. tag: tag,
  107. dispatcher: h.mux,
  108. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  109. uplinkCounter: uplinkCounter,
  110. downlinkCounter: downlinkCounter,
  111. ctx: ctx,
  112. }
  113. h.workers = append(h.workers, worker)
  114. }
  115. if net.HasNetwork(nl, net.Network_UDP) {
  116. worker := &udpWorker{
  117. tag: tag,
  118. proxy: p,
  119. address: address,
  120. port: net.Port(port),
  121. dispatcher: h.mux,
  122. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  123. uplinkCounter: uplinkCounter,
  124. downlinkCounter: downlinkCounter,
  125. stream: mss,
  126. ctx: ctx,
  127. }
  128. h.workers = append(h.workers, worker)
  129. }
  130. }
  131. }
  132. }
  133. return h, nil
  134. }
  135. // Start implements common.Runnable.
  136. func (h *AlwaysOnInboundHandler) Start() error {
  137. for _, worker := range h.workers {
  138. if err := worker.Start(); err != nil {
  139. return err
  140. }
  141. }
  142. return nil
  143. }
  144. // Close implements common.Closable.
  145. func (h *AlwaysOnInboundHandler) Close() error {
  146. var errs []error
  147. for _, worker := range h.workers {
  148. errs = append(errs, worker.Close())
  149. }
  150. errs = append(errs, h.mux.Close())
  151. if err := errors.Combine(errs...); err != nil {
  152. return newError("failed to close all resources").Base(err)
  153. }
  154. return nil
  155. }
  156. func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {
  157. if len(h.workers) == 0 {
  158. return nil, 0, 0
  159. }
  160. w := h.workers[dice.Roll(len(h.workers))]
  161. return w.Proxy(), w.Port(), 9999
  162. }
  163. func (h *AlwaysOnInboundHandler) Tag() string {
  164. return h.tag
  165. }
  166. func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
  167. return h.proxy
  168. }