portal.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package reverse
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/golang/protobuf/proto"
  7. "github.com/xtls/xray-core/common"
  8. "github.com/xtls/xray-core/common/buf"
  9. "github.com/xtls/xray-core/common/mux"
  10. "github.com/xtls/xray-core/common/net"
  11. "github.com/xtls/xray-core/common/session"
  12. "github.com/xtls/xray-core/common/task"
  13. "github.com/xtls/xray-core/features/outbound"
  14. "github.com/xtls/xray-core/transport"
  15. "github.com/xtls/xray-core/transport/pipe"
  16. )
  17. type Portal struct {
  18. ohm outbound.Manager
  19. tag string
  20. domain string
  21. picker *StaticMuxPicker
  22. client *mux.ClientManager
  23. }
  24. func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
  25. if config.Tag == "" {
  26. return nil, newError("portal tag is empty")
  27. }
  28. if config.Domain == "" {
  29. return nil, newError("portal domain is empty")
  30. }
  31. picker, err := NewStaticMuxPicker()
  32. if err != nil {
  33. return nil, err
  34. }
  35. return &Portal{
  36. ohm: ohm,
  37. tag: config.Tag,
  38. domain: config.Domain,
  39. picker: picker,
  40. client: &mux.ClientManager{
  41. Picker: picker,
  42. },
  43. }, nil
  44. }
  45. func (p *Portal) Start() error {
  46. return p.ohm.AddHandler(context.Background(), &Outbound{
  47. portal: p,
  48. tag: p.tag,
  49. })
  50. }
  51. func (p *Portal) Close() error {
  52. return p.ohm.RemoveHandler(context.Background(), p.tag)
  53. }
  54. func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
  55. outboundMeta := session.OutboundFromContext(ctx)
  56. if outboundMeta == nil {
  57. return newError("outbound metadata not found").AtError()
  58. }
  59. if isDomain(outboundMeta.Target, p.domain) {
  60. muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
  61. if err != nil {
  62. return newError("failed to create mux client worker").Base(err).AtWarning()
  63. }
  64. worker, err := NewPortalWorker(muxClient)
  65. if err != nil {
  66. return newError("failed to create portal worker").Base(err)
  67. }
  68. p.picker.AddWorker(worker)
  69. return nil
  70. }
  71. return p.client.Dispatch(ctx, link)
  72. }
  73. type Outbound struct {
  74. portal *Portal
  75. tag string
  76. }
  77. func (o *Outbound) Tag() string {
  78. return o.tag
  79. }
  80. func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
  81. if err := o.portal.HandleConnection(ctx, link); err != nil {
  82. newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
  83. common.Interrupt(link.Writer)
  84. }
  85. }
  86. func (o *Outbound) Start() error {
  87. return nil
  88. }
  89. func (o *Outbound) Close() error {
  90. return nil
  91. }
  92. type StaticMuxPicker struct {
  93. access sync.Mutex
  94. workers []*PortalWorker
  95. cTask *task.Periodic
  96. }
  97. func NewStaticMuxPicker() (*StaticMuxPicker, error) {
  98. p := &StaticMuxPicker{}
  99. p.cTask = &task.Periodic{
  100. Execute: p.cleanup,
  101. Interval: time.Second * 30,
  102. }
  103. p.cTask.Start()
  104. return p, nil
  105. }
  106. func (p *StaticMuxPicker) cleanup() error {
  107. p.access.Lock()
  108. defer p.access.Unlock()
  109. var activeWorkers []*PortalWorker
  110. for _, w := range p.workers {
  111. if !w.Closed() {
  112. activeWorkers = append(activeWorkers, w)
  113. }
  114. }
  115. if len(activeWorkers) != len(p.workers) {
  116. p.workers = activeWorkers
  117. }
  118. return nil
  119. }
  120. func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
  121. p.access.Lock()
  122. defer p.access.Unlock()
  123. if len(p.workers) == 0 {
  124. return nil, newError("empty worker list")
  125. }
  126. var minIdx int = -1
  127. var minConn uint32 = 9999
  128. for i, w := range p.workers {
  129. if w.draining {
  130. continue
  131. }
  132. if w.client.Closed() {
  133. continue
  134. }
  135. if w.client.ActiveConnections() < minConn {
  136. minConn = w.client.ActiveConnections()
  137. minIdx = i
  138. }
  139. }
  140. if minIdx == -1 {
  141. for i, w := range p.workers {
  142. if w.IsFull() {
  143. continue
  144. }
  145. if w.client.ActiveConnections() < minConn {
  146. minConn = w.client.ActiveConnections()
  147. minIdx = i
  148. }
  149. }
  150. }
  151. if minIdx != -1 {
  152. return p.workers[minIdx].client, nil
  153. }
  154. return nil, newError("no mux client worker available")
  155. }
  156. func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
  157. p.access.Lock()
  158. defer p.access.Unlock()
  159. p.workers = append(p.workers, worker)
  160. }
  161. type PortalWorker struct {
  162. client *mux.ClientWorker
  163. control *task.Periodic
  164. writer buf.Writer
  165. reader buf.Reader
  166. draining bool
  167. }
  168. func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
  169. opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
  170. uplinkReader, uplinkWriter := pipe.New(opt...)
  171. downlinkReader, downlinkWriter := pipe.New(opt...)
  172. ctx := context.Background()
  173. ctx = session.ContextWithOutbound(ctx, &session.Outbound{
  174. Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
  175. })
  176. f := client.Dispatch(ctx, &transport.Link{
  177. Reader: uplinkReader,
  178. Writer: downlinkWriter,
  179. })
  180. if !f {
  181. return nil, newError("unable to dispatch control connection")
  182. }
  183. w := &PortalWorker{
  184. client: client,
  185. reader: downlinkReader,
  186. writer: uplinkWriter,
  187. }
  188. w.control = &task.Periodic{
  189. Execute: w.heartbeat,
  190. Interval: time.Second * 2,
  191. }
  192. w.control.Start()
  193. return w, nil
  194. }
  195. func (w *PortalWorker) heartbeat() error {
  196. if w.client.Closed() {
  197. return newError("client worker stopped")
  198. }
  199. if w.draining || w.writer == nil {
  200. return newError("already disposed")
  201. }
  202. msg := &Control{}
  203. msg.FillInRandom()
  204. if w.client.TotalConnections() > 256 {
  205. w.draining = true
  206. msg.State = Control_DRAIN
  207. defer func() {
  208. common.Close(w.writer)
  209. common.Interrupt(w.reader)
  210. w.writer = nil
  211. }()
  212. }
  213. b, err := proto.Marshal(msg)
  214. common.Must(err)
  215. mb := buf.MergeBytes(nil, b)
  216. return w.writer.WriteMultiBuffer(mb)
  217. }
  218. func (w *PortalWorker) IsFull() bool {
  219. return w.client.IsFull()
  220. }
  221. func (w *PortalWorker) Closed() bool {
  222. return w.client.Closed()
  223. }