portal.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package reverse
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/xtls/xray-core/common"
  7. "github.com/xtls/xray-core/common/buf"
  8. "github.com/xtls/xray-core/common/errors"
  9. "github.com/xtls/xray-core/common/mux"
  10. "github.com/xtls/xray-core/common/net"
  11. "github.com/xtls/xray-core/common/session"
  12. "github.com/xtls/xray-core/common/task"
  13. "github.com/xtls/xray-core/features/outbound"
  14. "github.com/xtls/xray-core/transport"
  15. "github.com/xtls/xray-core/transport/pipe"
  16. "google.golang.org/protobuf/proto"
  17. )
  18. type Portal struct {
  19. ohm outbound.Manager
  20. tag string
  21. domain string
  22. picker *StaticMuxPicker
  23. client *mux.ClientManager
  24. }
  25. func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
  26. if config.Tag == "" {
  27. return nil, errors.New("portal tag is empty")
  28. }
  29. if config.Domain == "" {
  30. return nil, errors.New("portal domain is empty")
  31. }
  32. picker, err := NewStaticMuxPicker()
  33. if err != nil {
  34. return nil, err
  35. }
  36. return &Portal{
  37. ohm: ohm,
  38. tag: config.Tag,
  39. domain: config.Domain,
  40. picker: picker,
  41. client: &mux.ClientManager{
  42. Picker: picker,
  43. },
  44. }, nil
  45. }
  46. func (p *Portal) Start() error {
  47. return p.ohm.AddHandler(context.Background(), &Outbound{
  48. portal: p,
  49. tag: p.tag,
  50. })
  51. }
  52. func (p *Portal) Close() error {
  53. return p.ohm.RemoveHandler(context.Background(), p.tag)
  54. }
  55. func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
  56. outbounds := session.OutboundsFromContext(ctx)
  57. ob := outbounds[len(outbounds)-1]
  58. if ob == nil {
  59. return errors.New("outbound metadata not found").AtError()
  60. }
  61. if isDomain(ob.Target, p.domain) {
  62. muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
  63. if err != nil {
  64. return errors.New("failed to create mux client worker").Base(err).AtWarning()
  65. }
  66. worker, err := NewPortalWorker(muxClient)
  67. if err != nil {
  68. return errors.New("failed to create portal worker").Base(err)
  69. }
  70. p.picker.AddWorker(worker)
  71. return nil
  72. }
  73. return p.client.Dispatch(ctx, link)
  74. }
  75. type Outbound struct {
  76. portal *Portal
  77. tag string
  78. }
  79. func (o *Outbound) Tag() string {
  80. return o.tag
  81. }
  82. func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
  83. if err := o.portal.HandleConnection(ctx, link); err != nil {
  84. errors.LogInfoInner(ctx, err, "failed to process reverse connection")
  85. common.Interrupt(link.Writer)
  86. }
  87. }
  88. func (o *Outbound) Start() error {
  89. return nil
  90. }
  91. func (o *Outbound) Close() error {
  92. return nil
  93. }
  94. type StaticMuxPicker struct {
  95. access sync.Mutex
  96. workers []*PortalWorker
  97. cTask *task.Periodic
  98. }
  99. func NewStaticMuxPicker() (*StaticMuxPicker, error) {
  100. p := &StaticMuxPicker{}
  101. p.cTask = &task.Periodic{
  102. Execute: p.cleanup,
  103. Interval: time.Second * 30,
  104. }
  105. p.cTask.Start()
  106. return p, nil
  107. }
  108. func (p *StaticMuxPicker) cleanup() error {
  109. p.access.Lock()
  110. defer p.access.Unlock()
  111. var activeWorkers []*PortalWorker
  112. for _, w := range p.workers {
  113. if !w.Closed() {
  114. activeWorkers = append(activeWorkers, w)
  115. }
  116. }
  117. if len(activeWorkers) != len(p.workers) {
  118. p.workers = activeWorkers
  119. }
  120. return nil
  121. }
  122. func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
  123. p.access.Lock()
  124. defer p.access.Unlock()
  125. if len(p.workers) == 0 {
  126. return nil, errors.New("empty worker list")
  127. }
  128. var minIdx int = -1
  129. var minConn uint32 = 9999
  130. for i, w := range p.workers {
  131. if w.draining {
  132. continue
  133. }
  134. if w.client.Closed() {
  135. continue
  136. }
  137. if w.client.ActiveConnections() < minConn {
  138. minConn = w.client.ActiveConnections()
  139. minIdx = i
  140. }
  141. }
  142. if minIdx == -1 {
  143. for i, w := range p.workers {
  144. if w.IsFull() {
  145. continue
  146. }
  147. if w.client.ActiveConnections() < minConn {
  148. minConn = w.client.ActiveConnections()
  149. minIdx = i
  150. }
  151. }
  152. }
  153. if minIdx != -1 {
  154. return p.workers[minIdx].client, nil
  155. }
  156. return nil, errors.New("no mux client worker available")
  157. }
  158. func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
  159. p.access.Lock()
  160. defer p.access.Unlock()
  161. p.workers = append(p.workers, worker)
  162. }
  163. type PortalWorker struct {
  164. client *mux.ClientWorker
  165. control *task.Periodic
  166. writer buf.Writer
  167. reader buf.Reader
  168. draining bool
  169. }
  170. func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
  171. opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
  172. uplinkReader, uplinkWriter := pipe.New(opt...)
  173. downlinkReader, downlinkWriter := pipe.New(opt...)
  174. ctx := context.Background()
  175. outbounds := []*session.Outbound{{
  176. Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
  177. }}
  178. ctx = session.ContextWithOutbounds(ctx, outbounds)
  179. f := client.Dispatch(ctx, &transport.Link{
  180. Reader: uplinkReader,
  181. Writer: downlinkWriter,
  182. })
  183. if !f {
  184. return nil, errors.New("unable to dispatch control connection")
  185. }
  186. w := &PortalWorker{
  187. client: client,
  188. reader: downlinkReader,
  189. writer: uplinkWriter,
  190. }
  191. w.control = &task.Periodic{
  192. Execute: w.heartbeat,
  193. Interval: time.Second * 2,
  194. }
  195. w.control.Start()
  196. return w, nil
  197. }
  198. func (w *PortalWorker) heartbeat() error {
  199. if w.client.Closed() {
  200. return errors.New("client worker stopped")
  201. }
  202. if w.draining || w.writer == nil {
  203. return errors.New("already disposed")
  204. }
  205. msg := &Control{}
  206. msg.FillInRandom()
  207. if w.client.TotalConnections() > 256 {
  208. w.draining = true
  209. msg.State = Control_DRAIN
  210. defer func() {
  211. common.Close(w.writer)
  212. common.Interrupt(w.reader)
  213. w.writer = nil
  214. }()
  215. }
  216. b, err := proto.Marshal(msg)
  217. common.Must(err)
  218. mb := buf.MergeBytes(nil, b)
  219. return w.writer.WriteMultiBuffer(mb)
  220. }
  221. func (w *PortalWorker) IsFull() bool {
  222. return w.client.IsFull()
  223. }
  224. func (w *PortalWorker) Closed() bool {
  225. return w.client.Closed()
  226. }