123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- package reverse
- import (
- "context"
- "sync"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/xtls/xray-core/common"
- "github.com/xtls/xray-core/common/buf"
- "github.com/xtls/xray-core/common/mux"
- "github.com/xtls/xray-core/common/net"
- "github.com/xtls/xray-core/common/session"
- "github.com/xtls/xray-core/common/task"
- "github.com/xtls/xray-core/features/outbound"
- "github.com/xtls/xray-core/transport"
- "github.com/xtls/xray-core/transport/pipe"
- )
- type Portal struct {
- ohm outbound.Manager
- tag string
- domain string
- picker *StaticMuxPicker
- client *mux.ClientManager
- }
- func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
- if config.Tag == "" {
- return nil, newError("portal tag is empty")
- }
- if config.Domain == "" {
- return nil, newError("portal domain is empty")
- }
- picker, err := NewStaticMuxPicker()
- if err != nil {
- return nil, err
- }
- return &Portal{
- ohm: ohm,
- tag: config.Tag,
- domain: config.Domain,
- picker: picker,
- client: &mux.ClientManager{
- Picker: picker,
- },
- }, nil
- }
- func (p *Portal) Start() error {
- return p.ohm.AddHandler(context.Background(), &Outbound{
- portal: p,
- tag: p.tag,
- })
- }
- func (p *Portal) Close() error {
- return p.ohm.RemoveHandler(context.Background(), p.tag)
- }
- func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
- outboundMeta := session.OutboundFromContext(ctx)
- if outboundMeta == nil {
- return newError("outbound metadata not found").AtError()
- }
- if isDomain(outboundMeta.Target, p.domain) {
- muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
- if err != nil {
- return newError("failed to create mux client worker").Base(err).AtWarning()
- }
- worker, err := NewPortalWorker(muxClient)
- if err != nil {
- return newError("failed to create portal worker").Base(err)
- }
- p.picker.AddWorker(worker)
- return nil
- }
- return p.client.Dispatch(ctx, link)
- }
- type Outbound struct {
- portal *Portal
- tag string
- }
- func (o *Outbound) Tag() string {
- return o.tag
- }
- func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
- if err := o.portal.HandleConnection(ctx, link); err != nil {
- newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
- common.Interrupt(link.Writer)
- }
- }
- func (o *Outbound) Start() error {
- return nil
- }
- func (o *Outbound) Close() error {
- return nil
- }
- type StaticMuxPicker struct {
- access sync.Mutex
- workers []*PortalWorker
- cTask *task.Periodic
- }
- func NewStaticMuxPicker() (*StaticMuxPicker, error) {
- p := &StaticMuxPicker{}
- p.cTask = &task.Periodic{
- Execute: p.cleanup,
- Interval: time.Second * 30,
- }
- p.cTask.Start()
- return p, nil
- }
- func (p *StaticMuxPicker) cleanup() error {
- p.access.Lock()
- defer p.access.Unlock()
- var activeWorkers []*PortalWorker
- for _, w := range p.workers {
- if !w.Closed() {
- activeWorkers = append(activeWorkers, w)
- }
- }
- if len(activeWorkers) != len(p.workers) {
- p.workers = activeWorkers
- }
- return nil
- }
- func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
- p.access.Lock()
- defer p.access.Unlock()
- if len(p.workers) == 0 {
- return nil, newError("empty worker list")
- }
- var minIdx int = -1
- var minConn uint32 = 9999
- for i, w := range p.workers {
- if w.draining {
- continue
- }
- if w.client.Closed() {
- continue
- }
- if w.client.ActiveConnections() < minConn {
- minConn = w.client.ActiveConnections()
- minIdx = i
- }
- }
- if minIdx == -1 {
- for i, w := range p.workers {
- if w.IsFull() {
- continue
- }
- if w.client.ActiveConnections() < minConn {
- minConn = w.client.ActiveConnections()
- minIdx = i
- }
- }
- }
- if minIdx != -1 {
- return p.workers[minIdx].client, nil
- }
- return nil, newError("no mux client worker available")
- }
- func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
- p.access.Lock()
- defer p.access.Unlock()
- p.workers = append(p.workers, worker)
- }
- type PortalWorker struct {
- client *mux.ClientWorker
- control *task.Periodic
- writer buf.Writer
- reader buf.Reader
- draining bool
- }
- func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
- opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
- uplinkReader, uplinkWriter := pipe.New(opt...)
- downlinkReader, downlinkWriter := pipe.New(opt...)
- ctx := context.Background()
- ctx = session.ContextWithOutbound(ctx, &session.Outbound{
- Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
- })
- f := client.Dispatch(ctx, &transport.Link{
- Reader: uplinkReader,
- Writer: downlinkWriter,
- })
- if !f {
- return nil, newError("unable to dispatch control connection")
- }
- w := &PortalWorker{
- client: client,
- reader: downlinkReader,
- writer: uplinkWriter,
- }
- w.control = &task.Periodic{
- Execute: w.heartbeat,
- Interval: time.Second * 2,
- }
- w.control.Start()
- return w, nil
- }
- func (w *PortalWorker) heartbeat() error {
- if w.client.Closed() {
- return newError("client worker stopped")
- }
- if w.draining || w.writer == nil {
- return newError("already disposed")
- }
- msg := &Control{}
- msg.FillInRandom()
- if w.client.TotalConnections() > 256 {
- w.draining = true
- msg.State = Control_DRAIN
- defer func() {
- common.Close(w.writer)
- common.Interrupt(w.reader)
- w.writer = nil
- }()
- }
- b, err := proto.Marshal(msg)
- common.Must(err)
- mb := buf.MergeBytes(nil, b)
- return w.writer.WriteMultiBuffer(mb)
- }
- func (w *PortalWorker) IsFull() bool {
- return w.client.IsFull()
- }
- func (w *PortalWorker) Closed() bool {
- return w.client.Closed()
- }
|