portal.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package reverse
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/xtls/xray-core/common"
  7. "github.com/xtls/xray-core/common/buf"
  8. "github.com/xtls/xray-core/common/errors"
  9. "github.com/xtls/xray-core/common/mux"
  10. "github.com/xtls/xray-core/common/net"
  11. "github.com/xtls/xray-core/common/serial"
  12. "github.com/xtls/xray-core/common/session"
  13. "github.com/xtls/xray-core/common/task"
  14. "github.com/xtls/xray-core/features/outbound"
  15. "github.com/xtls/xray-core/transport"
  16. "github.com/xtls/xray-core/transport/pipe"
  17. "google.golang.org/protobuf/proto"
  18. )
  19. type Portal struct {
  20. ohm outbound.Manager
  21. tag string
  22. domain string
  23. picker *StaticMuxPicker
  24. client *mux.ClientManager
  25. }
  26. func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
  27. if config.Tag == "" {
  28. return nil, errors.New("portal tag is empty")
  29. }
  30. if config.Domain == "" {
  31. return nil, errors.New("portal domain is empty")
  32. }
  33. picker, err := NewStaticMuxPicker()
  34. if err != nil {
  35. return nil, err
  36. }
  37. return &Portal{
  38. ohm: ohm,
  39. tag: config.Tag,
  40. domain: config.Domain,
  41. picker: picker,
  42. client: &mux.ClientManager{
  43. Picker: picker,
  44. },
  45. }, nil
  46. }
  47. func (p *Portal) Start() error {
  48. return p.ohm.AddHandler(context.Background(), &Outbound{
  49. portal: p,
  50. tag: p.tag,
  51. })
  52. }
  53. func (p *Portal) Close() error {
  54. return p.ohm.RemoveHandler(context.Background(), p.tag)
  55. }
  56. func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
  57. outbounds := session.OutboundsFromContext(ctx)
  58. ob := outbounds[len(outbounds)-1]
  59. if ob == nil {
  60. return errors.New("outbound metadata not found").AtError()
  61. }
  62. if isDomain(ob.Target, p.domain) {
  63. muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
  64. if err != nil {
  65. return errors.New("failed to create mux client worker").Base(err).AtWarning()
  66. }
  67. worker, err := NewPortalWorker(muxClient)
  68. if err != nil {
  69. return errors.New("failed to create portal worker").Base(err)
  70. }
  71. p.picker.AddWorker(worker)
  72. return nil
  73. }
  74. return p.client.Dispatch(ctx, link)
  75. }
  76. type Outbound struct {
  77. portal *Portal
  78. tag string
  79. }
  80. func (o *Outbound) Tag() string {
  81. return o.tag
  82. }
  83. func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
  84. if err := o.portal.HandleConnection(ctx, link); err != nil {
  85. errors.LogInfoInner(ctx, err, "failed to process reverse connection")
  86. common.Interrupt(link.Writer)
  87. }
  88. }
  89. func (o *Outbound) Start() error {
  90. return nil
  91. }
  92. func (o *Outbound) Close() error {
  93. return nil
  94. }
  95. // SenderSettings implements outbound.Handler.
  96. func (o *Outbound) SenderSettings() *serial.TypedMessage {
  97. return nil
  98. }
  99. // ProxySettings implements outbound.Handler.
  100. func (o *Outbound) ProxySettings() *serial.TypedMessage {
  101. return nil
  102. }
  103. type StaticMuxPicker struct {
  104. access sync.Mutex
  105. workers []*PortalWorker
  106. cTask *task.Periodic
  107. }
  108. func NewStaticMuxPicker() (*StaticMuxPicker, error) {
  109. p := &StaticMuxPicker{}
  110. p.cTask = &task.Periodic{
  111. Execute: p.cleanup,
  112. Interval: time.Second * 30,
  113. }
  114. p.cTask.Start()
  115. return p, nil
  116. }
  117. func (p *StaticMuxPicker) cleanup() error {
  118. p.access.Lock()
  119. defer p.access.Unlock()
  120. var activeWorkers []*PortalWorker
  121. for _, w := range p.workers {
  122. if !w.Closed() {
  123. activeWorkers = append(activeWorkers, w)
  124. }
  125. }
  126. if len(activeWorkers) != len(p.workers) {
  127. p.workers = activeWorkers
  128. }
  129. return nil
  130. }
  131. func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
  132. p.access.Lock()
  133. defer p.access.Unlock()
  134. if len(p.workers) == 0 {
  135. return nil, errors.New("empty worker list")
  136. }
  137. var minIdx int = -1
  138. var minConn uint32 = 9999
  139. for i, w := range p.workers {
  140. if w.draining {
  141. continue
  142. }
  143. if w.IsFull() {
  144. continue
  145. }
  146. if w.client.ActiveConnections() < minConn {
  147. minConn = w.client.ActiveConnections()
  148. minIdx = i
  149. }
  150. }
  151. if minIdx == -1 {
  152. for i, w := range p.workers {
  153. if w.IsFull() {
  154. continue
  155. }
  156. if w.client.ActiveConnections() < minConn {
  157. minConn = w.client.ActiveConnections()
  158. minIdx = i
  159. }
  160. }
  161. }
  162. if minIdx != -1 {
  163. return p.workers[minIdx].client, nil
  164. }
  165. return nil, errors.New("no mux client worker available")
  166. }
  167. func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
  168. p.access.Lock()
  169. defer p.access.Unlock()
  170. p.workers = append(p.workers, worker)
  171. }
  172. type PortalWorker struct {
  173. client *mux.ClientWorker
  174. control *task.Periodic
  175. writer buf.Writer
  176. reader buf.Reader
  177. draining bool
  178. counter uint32
  179. }
  180. func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
  181. opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
  182. uplinkReader, uplinkWriter := pipe.New(opt...)
  183. downlinkReader, downlinkWriter := pipe.New(opt...)
  184. ctx := context.Background()
  185. outbounds := []*session.Outbound{{
  186. Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
  187. }}
  188. ctx = session.ContextWithOutbounds(ctx, outbounds)
  189. f := client.Dispatch(ctx, &transport.Link{
  190. Reader: uplinkReader,
  191. Writer: downlinkWriter,
  192. })
  193. if !f {
  194. return nil, errors.New("unable to dispatch control connection")
  195. }
  196. w := &PortalWorker{
  197. client: client,
  198. reader: downlinkReader,
  199. writer: uplinkWriter,
  200. }
  201. w.control = &task.Periodic{
  202. Execute: w.heartbeat,
  203. Interval: time.Second * 2,
  204. }
  205. w.control.Start()
  206. return w, nil
  207. }
  208. func (w *PortalWorker) heartbeat() error {
  209. if w.Closed() {
  210. return errors.New("client worker stopped")
  211. }
  212. if w.draining || w.writer == nil {
  213. return errors.New("already disposed")
  214. }
  215. msg := &Control{}
  216. msg.FillInRandom()
  217. if w.client.TotalConnections() > 256 {
  218. w.draining = true
  219. msg.State = Control_DRAIN
  220. defer func() {
  221. w.client.GetTimer().Reset(time.Second * 16)
  222. common.Close(w.writer)
  223. common.Interrupt(w.reader)
  224. w.writer = nil
  225. }()
  226. }
  227. w.counter = (w.counter + 1) % 5
  228. if w.draining || w.counter == 1 {
  229. b, err := proto.Marshal(msg)
  230. common.Must(err)
  231. mb := buf.MergeBytes(nil, b)
  232. return w.writer.WriteMultiBuffer(mb)
  233. }
  234. return nil
  235. }
  236. func (w *PortalWorker) IsFull() bool {
  237. return w.client.IsFull()
  238. }
  239. func (w *PortalWorker) Closed() bool {
  240. return w.client.Closed()
  241. }