Explorar o código

Refactor: FullCone TPROXY Inbound & Socks Outbound

https://t.me/projectXray/116037
RPRX %!s(int64=4) %!d(string=hai) anos
pai
achega
2da07e0f8a

+ 2 - 1
app/proxyman/inbound/worker.go

@@ -279,7 +279,8 @@ func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest
 		src: source,
 	}
 	if originalDest.IsValid() {
-		id.dest = originalDest
+		//id.dest = originalDest
+		b.UDP = &originalDest
 	}
 	conn, existing := w.getConnection(id)
 

+ 81 - 15
proxy/dokodemo/dokodemo.go

@@ -173,26 +173,30 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 			if d.sockopt != nil {
 				sockopt.Mark = d.sockopt.Mark
 			}
-			tConn, err := internet.DialSystem(ctx, net.DestinationFromAddr(conn.RemoteAddr()), sockopt)
+			to := net.DestinationFromAddr(conn.RemoteAddr())
+			tConn, err := internet.DialSystem(ctx, to, sockopt)
 			if err != nil {
 				return err
 			}
-			defer tConn.Close()
-
-			writer = &buf.SequentialWriter{Writer: tConn}
-			tReader := buf.NewPacketReader(tConn)
-			requestCount++
-			tproxyRequest = func() error {
-				defer func() {
-					if atomic.AddInt32(&requestCount, -1) == 0 {
-						timer.SetTimeout(plcy.Timeouts.DownlinkOnly)
+			writer = NewPacketWriter(tConn, &dest, ctx, &to, sockopt)
+			defer writer.(*PacketWriter).Close()
+			/*
+				defer tConn.Close()
+				writer = &buf.SequentialWriter{Writer: tConn}
+				tReader := buf.NewPacketReader(tConn)
+				requestCount++
+				tproxyRequest = func() error {
+					defer func() {
+						if atomic.AddInt32(&requestCount, -1) == 0 {
+							timer.SetTimeout(plcy.Timeouts.DownlinkOnly)
+						}
+					}()
+					if err := buf.Copy(tReader, link.Writer, buf.UpdateActivity(timer)); err != nil {
+						return newError("failed to transport request (TPROXY conn)").Base(err)
 					}
-				}()
-				if err := buf.Copy(tReader, link.Writer, buf.UpdateActivity(timer)); err != nil {
-					return newError("failed to transport request (TPROXY conn)").Base(err)
+					return nil
 				}
-				return nil
-			}
+			*/
 		}
 	}
 
@@ -215,3 +219,65 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 
 	return nil
 }
+
+func NewPacketWriter(conn net.Conn, d *net.Destination, ctx context.Context, to *net.Destination, sockopt *internet.SocketConfig) buf.Writer {
+	writer := &PacketWriter{
+		conn:    conn,
+		conns:   make(map[net.Destination]net.Conn),
+		ctx:     ctx,
+		to:      to,
+		sockopt: sockopt,
+	}
+	writer.conns[*d] = conn
+	return writer
+}
+
+type PacketWriter struct {
+	conn    net.Conn
+	conns   map[net.Destination]net.Conn
+	ctx     context.Context
+	to      *net.Destination
+	sockopt *internet.SocketConfig
+}
+
+func (w *PacketWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
+	for {
+		mb2, b := buf.SplitFirst(mb)
+		mb = mb2
+		if b == nil {
+			break
+		}
+		var err error
+		if b.UDP != nil && b.UDP.Address.Family().IsIP() {
+			conn := w.conns[*b.UDP]
+			if conn == nil {
+				w.sockopt.BindAddress = b.UDP.Address.IP()
+				w.sockopt.BindPort = uint32(b.UDP.Port)
+				conn, _ = internet.DialSystem(w.ctx, *w.to, w.sockopt)
+				if conn == nil {
+					b.Release()
+					continue
+				}
+				w.conns[*b.UDP] = conn
+			}
+			_, err = conn.Write(b.Bytes())
+		} else {
+			_, err = w.conn.Write(b.Bytes())
+		}
+		b.Release()
+		if err != nil {
+			buf.ReleaseMulti(mb)
+			return err
+		}
+	}
+	return nil
+}
+
+func (w *PacketWriter) Close() error {
+	for _, conn := range w.conns {
+		if conn != nil {
+			conn.Close()
+		}
+	}
+	return nil
+}

+ 3 - 2
proxy/socks/client.go

@@ -138,11 +138,12 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
 		defer udpConn.Close()
 		requestFunc = func() error {
 			defer timer.SetTimeout(p.Timeouts.DownlinkOnly)
-			return buf.Copy(link.Reader, &buf.SequentialWriter{Writer: NewUDPWriter(request, udpConn)}, buf.UpdateActivity(timer))
+			writer := &UDPWriter{Writer: udpConn, Request: request}
+			return buf.Copy(link.Reader, writer, buf.UpdateActivity(timer))
 		}
 		responseFunc = func() error {
 			defer timer.SetTimeout(p.Timeouts.UplinkOnly)
-			reader := &UDPReader{reader: udpConn}
+			reader := &UDPReader{Reader: udpConn}
 			return buf.Copy(reader, link.Writer, buf.UpdateActivity(timer))
 		}
 	}

+ 40 - 28
proxy/socks/protocol.go

@@ -360,47 +360,59 @@ func EncodeUDPPacket(request *protocol.RequestHeader, data []byte) (*buf.Buffer,
 }
 
 type UDPReader struct {
-	reader io.Reader
-}
-
-func NewUDPReader(reader io.Reader) *UDPReader {
-	return &UDPReader{reader: reader}
+	Reader io.Reader
 }
 
 func (r *UDPReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
-	b := buf.New()
-	if _, err := b.ReadFrom(r.reader); err != nil {
+	buffer := buf.New()
+	_, err := buffer.ReadFrom(r.Reader)
+	if err != nil {
+		buffer.Release()
 		return nil, err
 	}
-	if _, err := DecodeUDPPacket(b); err != nil {
+	u, err := DecodeUDPPacket(buffer)
+	if err != nil {
+		buffer.Release()
 		return nil, err
 	}
-	return buf.MultiBuffer{b}, nil
+	dest := u.Destination()
+	buffer.UDP = &dest
+	return buf.MultiBuffer{buffer}, nil
 }
 
 type UDPWriter struct {
-	request *protocol.RequestHeader
-	writer  io.Writer
-}
-
-func NewUDPWriter(request *protocol.RequestHeader, writer io.Writer) *UDPWriter {
-	return &UDPWriter{
-		request: request,
-		writer:  writer,
-	}
+	Writer  io.Writer
+	Request *protocol.RequestHeader
 }
 
-// Write implements io.Writer.
-func (w *UDPWriter) Write(b []byte) (int, error) {
-	eb, err := EncodeUDPPacket(w.request, b)
-	if err != nil {
-		return 0, err
-	}
-	defer eb.Release()
-	if _, err := w.writer.Write(eb.Bytes()); err != nil {
-		return 0, err
+func (w *UDPWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
+	for {
+		mb2, b := buf.SplitFirst(mb)
+		mb = mb2
+		if b == nil {
+			break
+		}
+		request := w.Request
+		if b.UDP != nil {
+			request = &protocol.RequestHeader{
+				Address: b.UDP.Address,
+				Port:    b.UDP.Port,
+			}
+		}
+		packet, err := EncodeUDPPacket(request, b.Bytes())
+		b.Release()
+		if err != nil {
+			buf.ReleaseMulti(mb)
+			return err
+		}
+		_, err = w.Writer.Write(packet.Bytes())
+		packet.Release()
+		if err != nil {
+			buf.ReleaseMulti(mb)
+			return err
+		}
 	}
-	return len(b), nil
+	return nil
 }
 
 func ClientHandshake(request *protocol.RequestHeader, reader io.Reader, writer io.Writer) (*protocol.RequestHeader, error) {

+ 2 - 2
proxy/socks/protocol_test.go

@@ -20,14 +20,14 @@ func TestUDPEncoding(t *testing.T) {
 		Address: net.IPAddress([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6}),
 		Port:    1024,
 	}
-	writer := &buf.SequentialWriter{Writer: NewUDPWriter(request, b)}
+	writer := &UDPWriter{Writer: b, Request: request}
 
 	content := []byte{'a'}
 	payload := buf.New()
 	payload.Write(content)
 	common.Must(writer.WriteMultiBuffer(buf.MultiBuffer{payload}))
 
-	reader := NewUDPReader(b)
+	reader := &UDPReader{Reader: b}
 
 	decodedPayload, err := reader.ReadMultiBuffer()
 	common.Must(err)