Browse Source

Add brutal

Fangliding 3 months ago
parent
commit
b33fde2e44

+ 11 - 2
app/proxyman/config.pb.go

@@ -411,6 +411,7 @@ type MultiplexingConfig struct {
 	XudpConcurrency int32 `protobuf:"varint,3,opt,name=xudpConcurrency,proto3" json:"xudpConcurrency,omitempty"`
 	// "reject" (default), "allow" or "skip".
 	XudpProxyUDP443 string `protobuf:"bytes,4,opt,name=xudpProxyUDP443,proto3" json:"xudpProxyUDP443,omitempty"`
+	BrutalBPS       uint64 `protobuf:"varint,5,opt,name=brutalBPS,proto3" json:"brutalBPS,omitempty"`
 	unknownFields   protoimpl.UnknownFields
 	sizeCache       protoimpl.SizeCache
 }
@@ -473,6 +474,13 @@ func (x *MultiplexingConfig) GetXudpProxyUDP443() string {
 	return ""
 }
 
+func (x *MultiplexingConfig) GetBrutalBPS() uint64 {
+	if x != nil {
+		return x.BrutalBPS
+	}
+	return 0
+}
+
 var File_app_proxyman_config_proto protoreflect.FileDescriptor
 
 const file_app_proxyman_config_proto_rawDesc = "" +
@@ -503,12 +511,13 @@ const file_app_proxyman_config_proto_rawDesc = "" +
 	"\x0eproxy_settings\x18\x03 \x01(\v2$.xray.transport.internet.ProxyConfigR\rproxySettings\x12T\n" +
 	"\x12multiplex_settings\x18\x04 \x01(\v2%.xray.app.proxyman.MultiplexingConfigR\x11multiplexSettings\x12\x19\n" +
 	"\bvia_cidr\x18\x05 \x01(\tR\aviaCidr\x12P\n" +
-	"\x0ftarget_strategy\x18\x06 \x01(\x0e2'.xray.transport.internet.DomainStrategyR\x0etargetStrategy\"\xa4\x01\n" +
+	"\x0ftarget_strategy\x18\x06 \x01(\x0e2'.xray.transport.internet.DomainStrategyR\x0etargetStrategy\"\xc2\x01\n" +
 	"\x12MultiplexingConfig\x12\x18\n" +
 	"\aenabled\x18\x01 \x01(\bR\aenabled\x12 \n" +
 	"\vconcurrency\x18\x02 \x01(\x05R\vconcurrency\x12(\n" +
 	"\x0fxudpConcurrency\x18\x03 \x01(\x05R\x0fxudpConcurrency\x12(\n" +
-	"\x0fxudpProxyUDP443\x18\x04 \x01(\tR\x0fxudpProxyUDP443BU\n" +
+	"\x0fxudpProxyUDP443\x18\x04 \x01(\tR\x0fxudpProxyUDP443\x12\x1c\n" +
+	"\tbrutalBPS\x18\x05 \x01(\x04R\tbrutalBPSBU\n" +
 	"\x15com.xray.app.proxymanP\x01Z&github.com/xtls/xray-core/app/proxyman\xaa\x02\x11Xray.App.Proxymanb\x06proto3"
 
 var (

+ 1 - 0
app/proxyman/config.proto

@@ -68,4 +68,5 @@ message MultiplexingConfig {
   int32 xudpConcurrency = 3;
   // "reject" (default), "allow" or "skip".
   string xudpProxyUDP443 = 4;
+  uint64 brutalBPS = 5;
 }

+ 4 - 2
app/proxyman/outbound/handler.go

@@ -136,7 +136,8 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbou
 							Dialer: h,
 							Strategy: mux.ClientStrategy{
 								MaxConcurrency: uint32(config.Concurrency),
-								MaxConnection:  128,
+								MaxReuseTimes:  32768,
+								BrutalBPS:      config.BrutalBPS,
 							},
 						},
 					},
@@ -157,7 +158,8 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbou
 							Dialer: h,
 							Strategy: mux.ClientStrategy{
 								MaxConcurrency: uint32(config.XudpConcurrency),
-								MaxConnection:  128,
+								MaxReuseTimes:  32768,
+								BrutalBPS:      config.BrutalBPS,
 							},
 						},
 					},

+ 6 - 0
common/buf/multi_buffer.go

@@ -75,6 +75,12 @@ func (mb MultiBuffer) Copy(b []byte) int {
 	return total
 }
 
+func (mb MultiBuffer) Bytes() []byte {
+	b := make([]byte, mb.Len())
+	mb.Copy(b)
+	return b
+}
+
 // ReadFrom reads all content from reader until EOF.
 func ReadFrom(reader io.Reader) (MultiBuffer, error) {
 	mb := make(MultiBuffer, 0, 16)

+ 27 - 0
common/mux/client.go

@@ -2,6 +2,7 @@ package mux
 
 import (
 	"context"
+	"encoding/binary"
 	goerrors "errors"
 	"io"
 	"sync"
@@ -9,6 +10,7 @@ import (
 
 	"github.com/xtls/xray-core/common"
 	"github.com/xtls/xray-core/common/buf"
+	"github.com/xtls/xray-core/common/dice"
 	"github.com/xtls/xray-core/common/errors"
 	"github.com/xtls/xray-core/common/net"
 	"github.com/xtls/xray-core/common/protocol"
@@ -171,6 +173,7 @@ func (f *DialingWorkerFactory) Create() (*ClientWorker, error) {
 type ClientStrategy struct {
 	MaxConcurrency uint32
 	MaxReuseTimes  uint32
+	BrutalBPS      uint64
 }
 
 type ClientWorker struct {
@@ -198,6 +201,9 @@ func NewClientWorker(stream transport.Link, s ClientStrategy) (*ClientWorker, er
 
 	go c.fetchOutput()
 	go c.monitor()
+	if s.BrutalBPS > 0 {
+		go c.sendSetBrutal(s.BrutalBPS)
+	}
 
 	return c, nil
 }
@@ -417,3 +423,24 @@ func (m *ClientWorker) fetchOutput() {
 		}
 	}
 }
+
+func (m *ClientWorker) sendSetBrutal(sendBPS uint64) {
+	meta := FrameMetadata{
+		SessionID:     91,
+		SessionStatus: SessionStatusSetBrutal,
+	}
+	meta.Option.Set(OptionData)
+	frame := buf.New()
+	common.Must(meta.WriteTo(frame))
+	lengthByte := frame.Extend(2)
+	speedByte := frame.Extend(int32(200 + dice.Roll(200)))
+	binary.BigEndian.PutUint64(speedByte, sendBPS)
+	binary.BigEndian.PutUint16(lengthByte, uint16(len(speedByte)))
+	errors.LogError(context.Background(), "Start sending SetBrutal frame with speed: ", sendBPS)
+	err := m.link.Writer.WriteMultiBuffer(buf.MultiBuffer{frame})
+	if err != nil {
+		frame.Release()
+		errors.LogError(context.Background(), "failed to send SetBrutal frame: ", err)
+	}
+	errors.LogInfo(context.Background(), "SetBrutal frame sent successfully")
+}

+ 2 - 0
common/mux/frame.go

@@ -21,6 +21,8 @@ const (
 	SessionStatusKeep      SessionStatus = 0x02
 	SessionStatusEnd       SessionStatus = 0x03
 	SessionStatusKeepAlive SessionStatus = 0x04
+
+	SessionStatusSetBrutal SessionStatus = 0x91
 )
 
 const (

+ 38 - 0
common/mux/server.go

@@ -2,6 +2,7 @@ package mux
 
 import (
 	"context"
+	"encoding/binary"
 	"io"
 	"time"
 
@@ -16,6 +17,7 @@ import (
 	"github.com/xtls/xray-core/core"
 	"github.com/xtls/xray-core/features/routing"
 	"github.com/xtls/xray-core/transport"
+	"github.com/xtls/xray-core/transport/internet/brutal"
 	"github.com/xtls/xray-core/transport/pipe"
 )
 
@@ -333,6 +335,40 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered
 	return nil
 }
 
+// note: do not return error, just log it
+func (w *ServerWorker) HandleSetBrutal(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if meta.Option.Has(OptionData) == false {
+		errors.LogError(ctx, "SetBrutal frame missing data")
+		return nil
+	}
+	chunkReader := NewStreamReader(reader)
+	data, err := chunkReader.ReadMultiBuffer()
+	if err != nil {
+		errors.LogError(ctx, "unexpected error when reading brutal data: ", err)
+	}
+	speed := binary.BigEndian.Uint64(data.Bytes())
+
+	inbound := session.InboundFromContext(ctx)
+	if inbound == nil || inbound.Conn == nil {
+		errors.LogError(ctx, "no inbound connection found for brutal set")
+		return nil
+	}
+	conn := inbound.Conn
+	tcpConn, ok := conn.(*net.TCPConn)
+	if !ok {
+		errors.LogError(ctx, "brutal can only be set on TCP connections")
+		return nil
+	}
+	err = brutal.SetBrutal(tcpConn, speed)
+	if err != nil {
+		errors.LogError(ctx, "failed to set brutal: ", err)
+		return nil
+	} else {
+		errors.LogInfo(ctx, "successfully set brutal speed: ", speed)
+	}
+	return nil
+}
+
 func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedReader) error {
 	var meta FrameMetadata
 	err := meta.Unmarshal(reader, session.IsReverseMuxFromContext(ctx))
@@ -349,6 +385,8 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead
 		err = w.handleStatusNew(session.ContextWithIsReverseMux(ctx, false), &meta, reader)
 	case SessionStatusKeep:
 		err = w.handleStatusKeep(&meta, reader)
+	case SessionStatusSetBrutal:
+		err = w.HandleSetBrutal(ctx, &meta, reader)
 	default:
 		status := meta.SessionStatus
 		return errors.New("unknown status: ", status).AtError()

+ 2 - 0
infra/conf/xray.go

@@ -102,6 +102,7 @@ func (c *SniffingConfig) Build() (*proxyman.SniffingConfig, error) {
 type MuxConfig struct {
 	Enabled         bool   `json:"enabled"`
 	Concurrency     int16  `json:"concurrency"`
+	BrutalBPS       uint64 `json:"brutalBPS"`
 	XudpConcurrency int16  `json:"xudpConcurrency"`
 	XudpProxyUDP443 string `json:"xudpProxyUDP443"`
 }
@@ -118,6 +119,7 @@ func (m *MuxConfig) Build() (*proxyman.MultiplexingConfig, error) {
 	return &proxyman.MultiplexingConfig{
 		Enabled:         m.Enabled,
 		Concurrency:     int32(m.Concurrency),
+		BrutalBPS:       m.BrutalBPS,
 		XudpConcurrency: int32(m.XudpConcurrency),
 		XudpProxyUDP443: m.XudpProxyUDP443,
 	}, nil